Module agents.playground_manager

Classes

class PlaygroundManager (ctx)
Expand source code
class PlaygroundManager:
    def __init__(self, ctx):
        self.job_context = ctx
        self.job_context.playground_manager = self

    def send_cascading_metrics(self, metrics: dict, full_turn_data: bool = False):
        """Sends cascading metrics to the playground.
            Args:
                metrics (dict): The metrics to send.
                full_turn_data (bool): Whether to send full turn data.
        """
        if full_turn_data:
            metrics = asdict(metrics)
        metrics = json.dumps(metrics)
        publish_config = PubSubPublishConfig(
            topic="AGENT_METRICS",
            message={"type": "cascading", "metrics": metrics, "full_turn_data": full_turn_data}
        )

        if self.job_context.room:
            asyncio.create_task(self.job_context.room.publish_to_pubsub(publish_config))
        else:
            logger.debug("Cannot send cascading metrics: room is not available")

    def send_realtime_metrics(self, metrics: dict, full_turn_data: bool = False):
        """Sends realtime metrics to the playground.
            Args:
                metrics (dict): The metrics to send.
                full_turn_data (bool): Whether to send full turn data.
        """
        if full_turn_data:
            metrics = asdict(metrics)
        metrics = json.dumps(metrics)
        publish_config = PubSubPublishConfig(
            topic="AGENT_METRICS",
            message={"type": "realtime", "metrics": metrics, "full_turn_data": full_turn_data}
        )

        if self.job_context.room:
            asyncio.create_task(self.job_context.room.publish_to_pubsub(publish_config))

Methods

def send_cascading_metrics(self, metrics: dict, full_turn_data: bool = False)
Expand source code
def send_cascading_metrics(self, metrics: dict, full_turn_data: bool = False):
    """Sends cascading metrics to the playground.
        Args:
            metrics (dict): The metrics to send.
            full_turn_data (bool): Whether to send full turn data.
    """
    if full_turn_data:
        metrics = asdict(metrics)
    metrics = json.dumps(metrics)
    publish_config = PubSubPublishConfig(
        topic="AGENT_METRICS",
        message={"type": "cascading", "metrics": metrics, "full_turn_data": full_turn_data}
    )

    if self.job_context.room:
        asyncio.create_task(self.job_context.room.publish_to_pubsub(publish_config))
    else:
        logger.debug("Cannot send cascading metrics: room is not available")

Sends cascading metrics to the playground.

Args

metrics : dict
The metrics to send.
full_turn_data : bool
Whether to send full turn data.
def send_realtime_metrics(self, metrics: dict, full_turn_data: bool = False)
Expand source code
def send_realtime_metrics(self, metrics: dict, full_turn_data: bool = False):
    """Sends realtime metrics to the playground.
        Args:
            metrics (dict): The metrics to send.
            full_turn_data (bool): Whether to send full turn data.
    """
    if full_turn_data:
        metrics = asdict(metrics)
    metrics = json.dumps(metrics)
    publish_config = PubSubPublishConfig(
        topic="AGENT_METRICS",
        message={"type": "realtime", "metrics": metrics, "full_turn_data": full_turn_data}
    )

    if self.job_context.room:
        asyncio.create_task(self.job_context.room.publish_to_pubsub(publish_config))

Sends realtime metrics to the playground.

Args

metrics : dict
The metrics to send.
full_turn_data : bool
Whether to send full turn data.