Module agents.pipeline

Classes

class Pipeline
Expand source code
class Pipeline(EventEmitter[Literal["start"]], ABC):
    """
    Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from.
    Inherits from EventEmitter to provide event handling capabilities.
    """
    
    def __init__(self) -> None:
        """Initialize the pipeline with event emitter capabilities"""
        super().__init__()
        self.loop: asyncio.AbstractEventLoop | None = None
        self.audio_track: CustomAudioStreamTrack | None = None
        self._wake_up_callback: Optional[Callable[[], None]] = None
        self._auto_register()
        
    def _auto_register(self) -> None:
        """Internal Method: Automatically register this pipeline with the current job context"""
        try:
            from .job import get_current_job_context
            job_context = get_current_job_context()
            if job_context:
                job_context._set_pipeline_internal(self)
        except ImportError:
            pass

    def _set_loop_and_audio_track(self, loop: asyncio.AbstractEventLoop, audio_track: CustomAudioStreamTrack) -> None:
        """Internal Method: Set the event loop and configure components"""
        self.loop = loop
        self.audio_track = audio_track
        self._configure_components()

    def _configure_components(self) -> None:
        """Internal Method: Configure pipeline components with the loop - to be overridden by subclasses"""
        pass

    def set_wake_up_callback(self, callback: Callable[[], None]) -> None:
        self._wake_up_callback = callback

    def _notify_speech_started(self) -> None:
        if self._wake_up_callback:
            self._wake_up_callback()

    @abstractmethod
    async def start(self, **kwargs: Any) -> None:
        """
        Start the pipeline processing.
        This is an abstract method that must be implemented by child classes.
        
        Args:
            **kwargs: Additional arguments that may be needed by specific pipeline implementations
        """
        pass
    
    @abstractmethod
    async def on_audio_delta(self, audio_data: bytes) -> None:
        """
        Handle incoming audio data from the user
        """
        pass
    
    @abstractmethod
    async def send_message(self, message: str) -> None:
        """
        Send a message to the pipeline.
        """
        pass
    
    async def cleanup(self) -> None:
        """
        Cleanup pipeline resources.
        Base implementation - subclasses should override and call super().cleanup()
        """
        self.loop = None
        self.audio_track = None
        self._wake_up_callback = None
        logger.info("Pipeline cleaned up")
    
    async def leave(self) -> None:
        """
        Leave the pipeline.
        Base implementation - subclasses should override if needed.
        """
        logger.info("Leaving pipeline")
        await self.cleanup()

Base Pipeline class that other pipeline types (RealTime, Cascading) will inherit from. Inherits from EventEmitter to provide event handling capabilities.

Initialize the pipeline with event emitter capabilities

Ancestors

Subclasses

Methods

async def cleanup(self) ‑> None
Expand source code
async def cleanup(self) -> None:
    """
    Cleanup pipeline resources.
    Base implementation - subclasses should override and call super().cleanup()
    """
    self.loop = None
    self.audio_track = None
    self._wake_up_callback = None
    logger.info("Pipeline cleaned up")

Cleanup pipeline resources. Base implementation - subclasses should override and call super().cleanup()

async def leave(self) ‑> None
Expand source code
async def leave(self) -> None:
    """
    Leave the pipeline.
    Base implementation - subclasses should override if needed.
    """
    logger.info("Leaving pipeline")
    await self.cleanup()

Leave the pipeline. Base implementation - subclasses should override if needed.

async def on_audio_delta(self, audio_data: bytes) ‑> None
Expand source code
@abstractmethod
async def on_audio_delta(self, audio_data: bytes) -> None:
    """
    Handle incoming audio data from the user
    """
    pass

Handle incoming audio data from the user

async def send_message(self, message: str) ‑> None
Expand source code
@abstractmethod
async def send_message(self, message: str) -> None:
    """
    Send a message to the pipeline.
    """
    pass

Send a message to the pipeline.

def set_wake_up_callback(self, callback: Callable[[], None]) ‑> None
Expand source code
def set_wake_up_callback(self, callback: Callable[[], None]) -> None:
    self._wake_up_callback = callback
async def start(self, **kwargs: Any) ‑> None
Expand source code
@abstractmethod
async def start(self, **kwargs: Any) -> None:
    """
    Start the pipeline processing.
    This is an abstract method that must be implemented by child classes.
    
    Args:
        **kwargs: Additional arguments that may be needed by specific pipeline implementations
    """
    pass

Start the pipeline processing. This is an abstract method that must be implemented by child classes.

Args

**kwargs
Additional arguments that may be needed by specific pipeline implementations