Module agents.pipeline
Classes
class Pipeline-
Expand source code
class Pipeline(EventEmitter[Literal["start"]], ABC): """ Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from. Inherits from EventEmitter to provide event handling capabilities. """ def __init__(self) -> None: """Initialize the pipeline with event emitter capabilities""" super().__init__() self.loop: asyncio.AbstractEventLoop | None = None self.audio_track: CustomAudioStreamTrack | None = None self._wake_up_callback: Optional[Callable[[], None]] = None self._recent_frames: list[av.VideoFrame] = [] self._max_frames_buffer = 5 self._auto_register() def _auto_register(self) -> None: """Internal Method: Automatically register this pipeline with the current job context""" try: from .job import get_current_job_context job_context = get_current_job_context() if job_context: job_context._set_pipeline_internal(self) except ImportError: pass def _set_loop_and_audio_track(self, loop: asyncio.AbstractEventLoop, audio_track: CustomAudioStreamTrack) -> None: """Internal Method: Set the event loop and configure components""" self.loop = loop self.audio_track = audio_track self._configure_components() def _configure_components(self) -> None: """Internal Method: Configure pipeline components with the loop - to be overridden by subclasses""" pass def set_wake_up_callback(self, callback: Callable[[], None]) -> None: self._wake_up_callback = callback def _notify_speech_started(self) -> None: if self._wake_up_callback: self._wake_up_callback() @abstractmethod async def start(self, **kwargs: Any) -> None: """ Start the pipeline processing. This is an abstract method that must be implemented by child classes. Args: **kwargs: Additional arguments that may be needed by specific pipeline implementations """ pass @abstractmethod async def on_audio_delta(self, audio_data: bytes) -> None: """ Handle incoming audio data from the user """ pass @abstractmethod async def on_video_delta(self, video_data: av.VideoFrame) -> None: """ Handle incoming video data from the user """ pass @abstractmethod async def send_message(self, message: str, handle: UtteranceHandle) -> None: """ Send a message to the pipeline. """ pass async def cleanup(self) -> None: """ Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup() """ self.loop = None self.audio_track = None self._wake_up_callback = None self._recent_frames = [] logger.info("Pipeline cleaned up") async def leave(self) -> None: """ Leave the pipeline. Base implementation - subclasses should override if needed. """ logger.info("Leaving pipeline") await self.cleanup() def get_latest_frames(self, num_frames: int = 1) -> list[av.VideoFrame]: """ Get the latest video frames from the pipeline. Args: num_frames: Number of frames to retrieve (default: 1, max: 5) Returns: List of VideoFrame objects. Returns empty list if vision is not enabled or no frames available. """ if not getattr(self, 'vision', False): logger.warning("Vision is not enabled in pipeline, please enable vision in RoomOptions.") return [] if num_frames < 1: num_frames = 1 elif num_frames > self._max_frames_buffer: num_frames = self._max_frames_buffer if not self._recent_frames: return [] return self._recent_frames[-num_frames:]Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from. Inherits from EventEmitter to provide event handling capabilities.
Initialize the pipeline with event emitter capabilities
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Subclasses
Methods
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """ Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup() """ self.loop = None self.audio_track = None self._wake_up_callback = None self._recent_frames = [] logger.info("Pipeline cleaned up")Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup()
def get_latest_frames(self, num_frames: int = 1) ‑> list[av.video.frame.VideoFrame]-
Expand source code
def get_latest_frames(self, num_frames: int = 1) -> list[av.VideoFrame]: """ Get the latest video frames from the pipeline. Args: num_frames: Number of frames to retrieve (default: 1, max: 5) Returns: List of VideoFrame objects. Returns empty list if vision is not enabled or no frames available. """ if not getattr(self, 'vision', False): logger.warning("Vision is not enabled in pipeline, please enable vision in RoomOptions.") return [] if num_frames < 1: num_frames = 1 elif num_frames > self._max_frames_buffer: num_frames = self._max_frames_buffer if not self._recent_frames: return [] return self._recent_frames[-num_frames:]Get the latest video frames from the pipeline.
Args
num_frames- Number of frames to retrieve (default: 1, max: 5)
Returns
List of VideoFrame objects. Returns empty list if vision is not enabled or no frames available.
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the pipeline. Base implementation - subclasses should override if needed. """ logger.info("Leaving pipeline") await self.cleanup()Leave the pipeline. Base implementation - subclasses should override if needed.
async def on_audio_delta(self, audio_data: bytes) ‑> None-
Expand source code
@abstractmethod async def on_audio_delta(self, audio_data: bytes) -> None: """ Handle incoming audio data from the user """ passHandle incoming audio data from the user
async def on_video_delta(self, video_data: av.video.frame.VideoFrame) ‑> None-
Expand source code
@abstractmethod async def on_video_delta(self, video_data: av.VideoFrame) -> None: """ Handle incoming video data from the user """ passHandle incoming video data from the user
async def send_message(self,
message: str,
handle: UtteranceHandle) ‑> None-
Expand source code
@abstractmethod async def send_message(self, message: str, handle: UtteranceHandle) -> None: """ Send a message to the pipeline. """ passSend a message to the pipeline.
def set_wake_up_callback(self, callback: Callable[[], None]) ‑> None-
Expand source code
def set_wake_up_callback(self, callback: Callable[[], None]) -> None: self._wake_up_callback = callback async def start(self, **kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def start(self, **kwargs: Any) -> None: """ Start the pipeline processing. This is an abstract method that must be implemented by child classes. Args: **kwargs: Additional arguments that may be needed by specific pipeline implementations """ passStart the pipeline processing. This is an abstract method that must be implemented by child classes.
Args
**kwargs- Additional arguments that may be needed by specific pipeline implementations