Module agents.pipeline
Classes
class Pipeline-
Expand source code
class Pipeline(EventEmitter[Literal["start"]], ABC): """ Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from. Inherits from EventEmitter to provide event handling capabilities. """ def __init__(self) -> None: """Initialize the pipeline with event emitter capabilities""" super().__init__() self.loop: asyncio.AbstractEventLoop | None = None self.audio_track: CustomAudioStreamTrack | None = None self._wake_up_callback: Optional[Callable[[], None]] = None self._auto_register() def _auto_register(self) -> None: """Internal Method: Automatically register this pipeline with the current job context""" try: from .job import get_current_job_context job_context = get_current_job_context() if job_context: job_context._set_pipeline_internal(self) except ImportError: pass def _set_loop_and_audio_track(self, loop: asyncio.AbstractEventLoop, audio_track: CustomAudioStreamTrack) -> None: """Internal Method: Set the event loop and configure components""" self.loop = loop self.audio_track = audio_track self._configure_components() def _configure_components(self) -> None: """Internal Method: Configure pipeline components with the loop - to be overridden by subclasses""" pass def set_wake_up_callback(self, callback: Callable[[], None]) -> None: self._wake_up_callback = callback def _notify_speech_started(self) -> None: if self._wake_up_callback: self._wake_up_callback() @abstractmethod async def start(self, **kwargs: Any) -> None: """ Start the pipeline processing. This is an abstract method that must be implemented by child classes. Args: **kwargs: Additional arguments that may be needed by specific pipeline implementations """ pass @abstractmethod async def on_audio_delta(self, audio_data: bytes) -> None: """ Handle incoming audio data from the user """ pass @abstractmethod async def send_message(self, message: str) -> None: """ Send a message to the pipeline. """ pass async def cleanup(self) -> None: """ Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup() """ self.loop = None self.audio_track = None self._wake_up_callback = None logger.info("Pipeline cleaned up") async def leave(self) -> None: """ Leave the pipeline. Base implementation - subclasses should override if needed. """ logger.info("Leaving pipeline") await self.cleanup()Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from. Inherits from EventEmitter to provide event handling capabilities.
Initialize the pipeline with event emitter capabilities
Ancestors
- EventEmitter
- typing.Generic
- abc.ABC
Subclasses
Methods
async def cleanup(self) ‑> None-
Expand source code
async def cleanup(self) -> None: """ Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup() """ self.loop = None self.audio_track = None self._wake_up_callback = None logger.info("Pipeline cleaned up")Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup()
async def leave(self) ‑> None-
Expand source code
async def leave(self) -> None: """ Leave the pipeline. Base implementation - subclasses should override if needed. """ logger.info("Leaving pipeline") await self.cleanup()Leave the pipeline. Base implementation - subclasses should override if needed.
async def on_audio_delta(self, audio_data: bytes) ‑> None-
Expand source code
@abstractmethod async def on_audio_delta(self, audio_data: bytes) -> None: """ Handle incoming audio data from the user """ passHandle incoming audio data from the user
async def send_message(self, message: str) ‑> None-
Expand source code
@abstractmethod async def send_message(self, message: str) -> None: """ Send a message to the pipeline. """ passSend a message to the pipeline.
def set_wake_up_callback(self, callback: Callable[[], None]) ‑> None-
Expand source code
def set_wake_up_callback(self, callback: Callable[[], None]) -> None: self._wake_up_callback = callback async def start(self, **kwargs: Any) ‑> None-
Expand source code
@abstractmethod async def start(self, **kwargs: Any) -> None: """ Start the pipeline processing. This is an abstract method that must be implemented by child classes. Args: **kwargs: Additional arguments that may be needed by specific pipeline implementations """ passStart the pipeline processing. This is an abstract method that must be implemented by child classes.
Args
**kwargs- Additional arguments that may be needed by specific pipeline implementations