Module agents.debug
Sub-modules
agents.debug.http_serveragents.debug.tracing
Classes
class HttpServer (host: str = '0.0.0.0',
port: int = 0,
loop: asyncio.events.AbstractEventLoop | None = None)-
Expand source code
class HttpServer: """ HTTP server for VideoSDK agents debugging and monitoring. Provides endpoints for health checks, worker status, and debugging information. """ def __init__( self, host: str = "0.0.0.0", port: int = 0, loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self._loop = loop or asyncio.get_event_loop() self._host = host self._port = port self._app = web.Application(loop=self._loop) self._lock = asyncio.Lock() self._server = None self._worker = None @property def app(self) -> web.Application: """Get the aiohttp application.""" return self._app @property def port(self) -> int: """Get the port the server is listening on.""" return self._port def set_worker(self, worker: Any) -> None: """Set the worker instance for status endpoints.""" self._worker = worker async def start(self) -> None: """Start the HTTP server.""" async with self._lock: # Add routes - matching structure self._app.add_routes([web.get("/", self._handle_dashboard)]) self._app.add_routes([web.get("/debug/worker/", self._worker_debug)]) self._app.add_routes([web.get("/debug/runners/", self._runners_list)]) self._app.add_routes([web.get("/debug/runner/", self._runner_details)]) self._app.add_routes([web.get("/health", self._health_check)]) self._app.add_routes([web.get("/worker", self._worker_status)]) self._app.add_routes([web.get("/stats", self._worker_stats)]) self._app.add_routes([web.get("/debug", self._debug_info)]) self._app.add_routes([web.get("/api/status", self._api_status)]) # Create server handler = self._app.make_handler() self._server = await self._loop.create_server( handler, self._host, self._port ) # Get actual port if using port 0 if self._port == 0: self._port = self._server.sockets[0].getsockname()[1] await self._server.start_serving() async def aclose(self) -> None: """Close the HTTP server.""" async with self._lock: if self._server: self._server.close() await self._server.wait_closed() async def _handle_dashboard(self, request: web.Request) -> web.Response: """Serve the main dashboard HTML page - matching style.""" html_content = """<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title>videosdk-agents - tracing</title> <style> body { font-family: sans-serif; margin: 8px; padding: 0; } .section { padding: 8px; font-size: 0.9em; margin-top: 8px; } .collapsible-title { display: block; cursor: pointer; user-select: none; } .collapsible-title::before { content: "▶ "; } .collapsible-title.expanded::before { content: "▼ "; } .collapsible-content { display: none; margin-left: 20px; /* optional indent for nested content */ } .nested-collapsible-title {} .nested-collapsible-content {} .horizontal-group { display: flex; align-items: center; margin-bottom: 8px; } .refresh-icon { font-size: 16px; font-weight: bold; margin-right: 4px; } canvas { border: 1px solid #ccc; } .graph-title { font-weight: bold; margin-top: 8px; } </style> </head> <body> <!-- Worker Section --> <div class="section"> <div class="horizontal-group"> <h2 style="margin: 0 8px 0 0">Worker</h2> <button onclick="refreshWorker()"> <span class="refresh-icon">⟳</span>Refresh </button> </div> <div id="workerSection"></div> </div> <!-- Runners List --> <div class="section"> <div class="horizontal-group"> <h2 style="margin: 0 8px 0 0">Runners</h2> <button onclick="refreshRunners()"> <span class="refresh-icon">⟳</span>Refresh </button> </div> <div id="runnersList"></div> </div> <script> // Global state to remember which collapsibles are open // runnerOpenState[runnerId] = { open: true/false, sub: { "Key/Value": bool, "Events": bool }, ... } // We'll also store 'Worker' as a special ID => runnerOpenState["__WORKER__"] for worker KV / Events const runnerOpenState = {}; const $ = (id) => document.getElementById(id); // ------------------------------ // HTTP Utility // ------------------------------ async function fetchJSON(url) { const r = await fetch(url); if (!r.ok) throw new Error("Network error"); return r.json(); } // ------------------------------ // Collapsible toggle logic // ------------------------------ function toggleCollapsible(titleEl, contentEl) { const isOpen = contentEl.style.display === "block"; contentEl.style.display = isOpen ? "none" : "block"; titleEl.classList.toggle("expanded", !isOpen); } // Re-apply state if we know something should be open function applyOpenState(titleEl, contentEl, open) { if (open) { contentEl.style.display = "block"; titleEl.classList.add("expanded"); } else { contentEl.style.display = "none"; titleEl.classList.remove("expanded"); } } // ------------------------------ // Time label // ------------------------------ function timeLabel(val) { const d = new Date(val * 1000); let hh = String(d.getHours()).padStart(2, "0"); let mm = String(d.getMinutes()).padStart(2, "0"); let ss = String(d.getSeconds()).padStart(2, "0"); return `${hh}:${mm}:${ss}`; } // ------------------------------ // Export Utility // ------------------------------ function exportEventsToJSON(events) { const dataStr = JSON.stringify(events, null, 2); const blob = new Blob([dataStr], { type: "application/json" }); const url = URL.createObjectURL(blob); // Create a temporary link and auto-click to download const link = document.createElement("a"); link.href = url; link.download = "events.json"; document.body.appendChild(link); link.click(); // Cleanup document.body.removeChild(link); URL.revokeObjectURL(url); } // ------------------------------ // Rendering Tracing Data // ------------------------------ function renderKeyValue(container, kv) { const ul = document.createElement("ul"); Object.entries(kv).forEach(([k, v]) => { const li = document.createElement("li"); li.textContent = `${k}: ${JSON.stringify(v)}`; ul.appendChild(li); }); container.appendChild(ul); } // // Keep each event on a single line. Don't show "click to expand" if data is null. // function renderEvents(container, events) { const ul = document.createElement("ul"); events.forEach((e) => { // Each event => list item const li = document.createElement("li"); // Create a wrapper span for the event name/time const titleLine = document.createElement("span"); titleLine.textContent = `${new Date( e.timestamp * 1000 ).toLocaleTimeString()} - ${e.name}`; li.appendChild(titleLine); // Only show the collapsible "Data" button if e.data is not null if (e.data != null) { const dataTitle = document.createElement("span"); dataTitle.style.fontSize = "0.8em"; dataTitle.style.marginLeft = "10px"; dataTitle.style.cursor = "pointer"; dataTitle.textContent = "[Data (click to expand)]"; // Collapsible content block (hidden by default) const dataContent = document.createElement("div"); dataContent.className = "collapsible-content nested-collapsible-content"; dataContent.style.display = "none"; // Pretty-print JSON with 2-space indentation const pre = document.createElement("pre"); pre.textContent = JSON.stringify(e.data, null, 2); dataContent.appendChild(pre); li.appendChild(dataTitle); li.appendChild(dataContent); // Wire up the click event to toggle the data display dataTitle.addEventListener("click", () => { toggleCollapsible(dataTitle, dataContent); }); } ul.appendChild(li); }); container.appendChild(ul); } function drawGraph(canvas, g) { const ctx = canvas.getContext("2d"); const w = canvas.width, h = canvas.height, pad = 40; ctx.clearRect(0, 0, w, h); if (!g.data?.length) { ctx.fillText("No data", w / 2 - 20, h / 2); return; } const xs = g.data.map((d) => d[0]); const ys = g.data.map((d) => d[1]); let [minX, maxX] = [Math.min(...xs), Math.max(...xs)]; if (minX === maxX) [minX, maxX] = [0, 1]; let [minY, maxY] = [Math.min(...ys), Math.max(...ys)]; if (g.y_range) [minY, maxY] = g.y_range; else if (minY === maxY) [minY, maxY] = [0, 1]; // Axes ctx.strokeStyle = "#000"; ctx.beginPath(); ctx.moveTo(pad, h - pad); ctx.lineTo(w - pad, h - pad); ctx.moveTo(pad, pad); ctx.lineTo(pad, h - pad); ctx.stroke(); const pw = w - 2 * pad, ph = h - 2 * pad; const toCX = (x) => pad + (x - minX) * (pw / (maxX - minX)); const toCY = (y) => h - pad - (y - minY) * (ph / (maxY - minY)); // Graph line ctx.strokeStyle = "red"; ctx.beginPath(); ctx.moveTo(toCX(xs[0]), toCY(ys[0])); for (let i = 1; i < xs.length; i++) { ctx.lineTo(toCX(xs[i]), toCY(ys[i])); } ctx.stroke(); // Ticks ctx.strokeStyle = "#000"; ctx.fillStyle = "#000"; ctx.font = "10px sans-serif"; // X for (let i = 0; i <= 5; i++) { let vx = minX + (i * (maxX - minX)) / 5; let cx = toCX(vx), cy = h - pad; ctx.beginPath(); ctx.moveTo(cx, cy); ctx.lineTo(cx, cy + 5); ctx.stroke(); let label = g.x_type === "time" ? timeLabel(vx) : vx.toFixed(2); let tw = ctx.measureText(label).width; ctx.fillText(label, cx - tw / 2, cy + 15); } // Y for (let i = 0; i <= 5; i++) { let vy = minY + (i * (maxY - minY)) / 5; let cx = pad, cy = toCY(vy); ctx.beginPath(); ctx.moveTo(cx, cy); ctx.lineTo(cx - 5, cy); ctx.stroke(); let lbl = vy.toFixed(2), tw = ctx.measureText(lbl).width; ctx.fillText(lbl, cx - tw - 6, cy + 3); } // Labels if (g.x_label) { let tw = ctx.measureText(g.x_label).width; ctx.fillText(g.x_label, w / 2 - tw / 2, h - 5); } if (g.y_label) { ctx.save(); ctx.translate(10, h / 2); ctx.rotate(-Math.PI / 2); ctx.textAlign = "center"; ctx.fillText(g.y_label, 0, 0); ctx.restore(); } } function renderGraphs(container, graphs) { graphs.forEach((g) => { const gt = document.createElement("div"); gt.className = "graph-title"; gt.innerText = g.title; container.appendChild(gt); const c = document.createElement("canvas"); c.width = 400; c.height = 200; container.appendChild(c); drawGraph(c, g); }); } // Render top-level Key/Value, Events, Graphs function renderTracing(container, tracing, runnerId = "__WORKER__") { if (!tracing) { container.textContent = "No tracing data"; return; } // Key/Value if (tracing.kv) { const kvTitle = document.createElement("div"); kvTitle.className = "collapsible-title nested-collapsible-title"; kvTitle.innerText = "Key/Value"; container.appendChild(kvTitle); const kvContent = document.createElement("div"); kvContent.className = "collapsible-content nested-collapsible-content"; container.appendChild(kvContent); // Ensure the open state matches what we have in runnerOpenState let subKey = "Key/Value"; applyOpenState( kvTitle, kvContent, getSubSectionOpen(runnerId, subKey) ); kvTitle.onclick = () => { toggleCollapsible(kvTitle, kvContent); setSubSectionOpen( runnerId, subKey, kvContent.style.display === "block" ); }; renderKeyValue(kvContent, tracing.kv); } // Events if (tracing.events) { const eTitle = document.createElement("div"); eTitle.className = "collapsible-title nested-collapsible-title"; eTitle.innerText = "Events"; container.appendChild(eTitle); const eContent = document.createElement("div"); eContent.className = "collapsible-content nested-collapsible-content"; container.appendChild(eContent); let subKey = "Events"; applyOpenState(eTitle, eContent, getSubSectionOpen(runnerId, subKey)); eTitle.onclick = () => { toggleCollapsible(eTitle, eContent); setSubSectionOpen( runnerId, subKey, eContent.style.display === "block" ); }; // Create a button to export the events to JSON const exportBtn = document.createElement("button"); exportBtn.textContent = "Export Events to JSON"; exportBtn.style.marginBottom = "8px"; exportBtn.onclick = () => exportEventsToJSON(tracing.events); eContent.appendChild(exportBtn); // Render the events renderEvents(eContent, tracing.events); } // Graphs if (tracing.graph) { renderGraphs(container, tracing.graph); } } // ------------------------------ // Global State Accessors // ------------------------------ function getRunnerState(id) { if (!runnerOpenState[id]) { runnerOpenState[id] = { open: false, sub: {} }; } return runnerOpenState[id]; } function isRunnerOpen(id) { return getRunnerState(id).open; } function setRunnerOpen(id, open) { getRunnerState(id).open = open; } function getSubSectionOpen(runnerId, subsection) { return getRunnerState(runnerId).sub[subsection] === true; } function setSubSectionOpen(runnerId, subsection, open) { getRunnerState(runnerId).sub[subsection] = open; } // ------------------------------ // Worker // ------------------------------ async function refreshWorker() { const sec = $("workerSection"); sec.textContent = "Loading..."; try { const data = await fetchJSON("/debug/worker/"); sec.innerHTML = ""; renderTracing(sec, data.tracing, "__WORKER__"); // use a special ID } catch (e) { sec.textContent = "Error: " + e; } } // ------------------------------ // Runners // ------------------------------ async function refreshRunners() { const rl = $("runnersList"); rl.textContent = "Loading..."; try { const data = await fetchJSON("/debug/runners/"); rl.innerHTML = ""; data.runners.forEach((r) => { const runnerId = String(r.id); const wrap = document.createElement("div"); wrap.style.marginBottom = "16px"; // Collapsible runner title const title = document.createElement("div"); title.className = "collapsible-title"; title.innerText = `room: ${r.room} — status: ${r.status}, task_id: ${r.task_id} ${r.id}`; wrap.appendChild(title); // Collapsible content const content = document.createElement("div"); content.className = "collapsible-content"; wrap.appendChild(content); // Apply saved open state from runnerOpenState applyOpenState(title, content, isRunnerOpen(runnerId)); // On title click => toggle + fetch details (only if we open) title.onclick = async () => { if (content.style.display !== "block") { // about to open content.textContent = "Loading..."; toggleCollapsible(title, content); setRunnerOpen(runnerId, true); await fetchRunnerDetails(runnerId, content); } else { // about to close toggleCollapsible(title, content); setRunnerOpen(runnerId, false); } }; rl.appendChild(wrap); // If runner is open from before, we fetch details right away if (isRunnerOpen(runnerId)) { fetchRunnerDetails(runnerId, content); } }); } catch (e) { rl.textContent = "Error: " + e; } } async function fetchRunnerDetails(id, container) { try { const data = await fetchJSON( `/debug/runner/?id=${encodeURIComponent(id)}` ); container.innerHTML = ""; const dataDiv = document.createElement("div"); container.appendChild(dataDiv); await loadRunnerTracing(id, dataDiv); } catch (e) { container.textContent = "Error: " + e; } } async function loadRunnerTracing(id, container) { try { const d = await fetchJSON( `/debug/runner/?id=${encodeURIComponent(id)}` ); container.innerHTML = ""; renderTracing(container, d.tracing, id); } catch (e) { container.textContent = "Error: " + e; } } // Initial calls refreshWorker(); refreshRunners(); </script> </body> </html>""" return web.Response(text=html_content, content_type="text/html") async def _worker_debug(self, request: web.Request) -> web.Response: """Worker debug endpoint - matching structure.""" try: if not self._worker: return web.json_response({"tracing": None}) # Get tracing data from the tracing system from .tracing import Tracing tracing_data = Tracing.export_for_handle("worker") # Add worker stats as key-value data if not already present if not tracing_data.get("kv"): try: stats = self._worker.get_stats() tracing_data["kv"] = { "agent_id": stats.get("agent_id", "Unknown"), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "worker_load": stats.get("worker_load", 0.0), "current_jobs": stats.get("current_jobs", 0), "max_processes": stats.get("max_processes", 0), "backend_connected": stats.get("backend_connected", False), "worker_id": stats.get("worker_id", "unregistered"), "draining": stats.get("draining", False), "register": stats.get("register", False), } except Exception as stats_error: logger.error(f"Error getting worker stats: {stats_error}") tracing_data["kv"] = { "agent_id": getattr( self._worker.options, "agent_id", "Unknown" ), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "worker_load": 0.0, "current_jobs": 0, "max_processes": 0, "backend_connected": False, "worker_id": "unregistered", "draining": False, "register": getattr(self._worker.options, "register", False), "error": f"Stats error: {str(stats_error)}", } # Add some default events if none exist if not tracing_data.get("events"): tracing_data["events"] = [ { "timestamp": time.time(), "name": "worker_started", "data": { "agent_id": getattr( self._worker.options, "agent_id", "Unknown" ) }, } ] # Add graphs if available try: if ( hasattr(self._worker, "_worker_load_graph") and self._worker._worker_load_graph ): graph_data = self._worker._worker_load_graph.export() if graph_data.get("data"): tracing_data["graph"] = [graph_data] except Exception as graph_error: logger.error(f"Error getting worker load graph: {graph_error}") return web.json_response({"tracing": tracing_data}) except Exception as e: logger.error(f"Error in worker debug endpoint: {e}") return web.json_response({"error": str(e)}, status=500) async def _runners_list(self, request: web.Request) -> web.Response: """Runners list endpoint - matching structure.""" try: if not self._worker: return web.json_response({"runners": []}) runners = [] # Get current jobs as runners if hasattr(self._worker, "_current_jobs"): for job_id, job_info in self._worker._current_jobs.items(): try: # Extract room information safely room_options = {} if hasattr(job_info, "job") and job_info.job: if hasattr(job_info.job, "room_options"): room_options = job_info.job.room_options elif isinstance(job_info.job, dict): room_options = job_info.job.get("room_options", {}) runners.append( { "id": job_id, "room": getattr(room_options, "room_id", "unknown"), "status": "running", # Default status "task_id": job_id, # Changed from job_id to task_id } ) except Exception as e: # Log error but continue with other jobs logger.warning(f"Error processing job {job_id}: {e}") runners.append( { "id": job_id, "room": "error", "status": "error", "task_id": job_id, # Changed from job_id to task_id } ) # If no runners found and we're in direct mode, create a placeholder if not runners and not getattr(self._worker.options, "register", False): # Check if we have any active processes/threads try: if ( hasattr(self._worker, "process_manager") and self._worker.process_manager ): stats = self._worker.process_manager.get_stats() # New execution module returns different stats format executor_stats = stats.get("executor_stats", {}) active_tasks = executor_stats.get("pending_tasks", 0) running_tasks = executor_stats.get("running_tasks", 0) total_active = active_tasks + running_tasks for i in range(total_active): runners.append( { "id": f"direct_job_{i}", "room": "direct_mode", "status": "running", "task_id": f"direct_job_{i}", # Changed from job_id to task_id } ) except Exception as e: logger.warning(f"Error getting process manager stats: {e}") # Add a default runner if we can't get stats runners.append( { "id": "direct_job_0", "room": "direct_mode", "status": "unknown", "task_id": "direct_job_0", # Changed from job_id to task_id } ) # If still no runners, add a placeholder for the current worker if not runners: runners.append( { "id": "worker_main", "room": "main_worker", "status": "idle", "task_id": "worker_main", # Changed from job_id to task_id } ) return web.json_response({"runners": runners}) except Exception as e: logger.error(f"Error in _runners_list: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return web.json_response({"error": str(e)}, status=500) async def _runner_details(self, request: web.Request) -> web.Response: """Runner details endpoint - matching structure.""" try: runner_id = request.query.get("id") if not runner_id: return web.json_response({"error": "Missing runner ID"}, status=400) if not self._worker: return web.json_response({"tracing": None}) # Handle placeholder runners if runner_id == "worker_main": # Return worker-level tracing data from .tracing import Tracing tracing_data = Tracing.export_for_handle("worker") # Add worker-specific key-value data try: stats = self._worker.get_stats() tracing_data["kv"] = { "runner_id": runner_id, "type": "main_worker", "status": "idle", "agent_id": stats.get("agent_id", "Unknown"), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "register": stats.get("register", False), } except Exception as e: logger.warning( f"Error getting worker stats for runner details: {e}" ) tracing_data["kv"] = { "runner_id": runner_id, "type": "main_worker", "status": "idle", "agent_id": getattr( self._worker.options, "agent_id", "Unknown" ), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "register": getattr(self._worker.options, "register", False), "error": f"Stats error: {str(e)}", } return web.json_response({"tracing": tracing_data}) # Handle direct job runners if runner_id.startswith("direct_job_"): from .tracing import Tracing tracing_data = Tracing.export_for_handle(f"runner_{runner_id}") # Add direct job key-value data tracing_data["kv"] = { "runner_id": runner_id, "type": "direct_job", "status": "running", "mode": "direct", } return web.json_response({"tracing": tracing_data}) # Handle regular backend jobs if not hasattr(self._worker, "_current_jobs"): return web.json_response({"tracing": None}) # Find the runner job_info = self._worker._current_jobs.get(runner_id) if not job_info: return web.json_response({"tracing": None}) # Get tracing data for this specific runner from .tracing import Tracing tracing_data = Tracing.export_for_handle(f"runner_{runner_id}") # Add job-specific key-value data room_options = {} if hasattr(job_info, "job") and job_info.job: if hasattr(job_info.job, "room_options"): room_options = job_info.job.room_options elif isinstance(job_info.job, dict): room_options = job_info.job.get("room_options", {}) tracing_data["kv"] = { "task_id": runner_id, # Changed from job_id to task_id "room_id": getattr(room_options, "room_id", "unknown"), "room_name": getattr(room_options, "name", "unknown"), "status": "running", "worker_id": getattr(job_info, "worker_id", "unknown"), "url": getattr(job_info, "url", "unknown"), } # Add job-specific events if none exist if not tracing_data.get("events"): tracing_data["events"] = [ { "timestamp": time.time(), "name": "job_started", "data": { "job_id": runner_id, "room_id": getattr(room_options, "room_id", "unknown"), }, } ] return web.json_response({"tracing": tracing_data}) except Exception as e: logger.error(f"Error in _runner_details: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return web.json_response({"error": str(e)}, status=500) async def _api_status(self, request: web.Request) -> web.Response: """API endpoint for dashboard data.""" try: worker_status = None stats = None if self._worker: try: worker_status = { "available": True, "connected": ( self._worker.backend_connection.is_connected if self._worker.backend_connection else False ), "worker_id": ( self._worker.backend_connection.worker_id if self._worker.backend_connection else "unregistered" ), "options": { "agent_id": getattr( self._worker.options, "agent_id", "Unknown" ), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "register": getattr( self._worker.options, "register", False ), "max_processes": getattr( self._worker.options, "max_processes", 0 ), "log_level": getattr( self._worker.options, "log_level", "INFO" ), }, } except Exception as e: worker_status = {"available": False, "error": str(e)} try: stats = self._worker.get_stats() except Exception as e: stats = {"error": str(e)} server_info = { "host": self._host, "port": self._port, "endpoints": [ "/", "/health", "/worker", "/stats", "/debug", "/api/status", "/debug/worker/", "/debug/runners/", "/debug/runner/", ], } return web.json_response( { "worker": worker_status, "stats": stats, "server": server_info, "timestamp": time.time(), } ) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def _health_check(self, request: web.Request) -> web.Response: """Health check endpoint.""" return web.Response(text="OK", content_type="text/plain") async def _worker_status(self, request: web.Request) -> web.Response: """Worker status endpoint.""" if not self._worker: return web.json_response({"error": "Worker not available"}) try: # Get current jobs count from the worker current_jobs = ( len(self._worker._current_jobs) if hasattr(self._worker, "_current_jobs") else 0 ) status = { "agent_id": getattr(self._worker.options, "agent_id", "Unknown"), "executor_type": ( getattr(self._worker.options, "executor_type", "Unknown").value if hasattr(self._worker.options, "executor_type") else "Unknown" ), "active_jobs": current_jobs, "connected": ( self._worker.backend_connection.is_connected if self._worker.backend_connection else False ), "worker_id": ( self._worker.backend_connection.worker_id if self._worker.backend_connection else "unregistered" ), "register": getattr(self._worker.options, "register", False), "draining": getattr(self._worker, "_draining", False), "worker_load": getattr(self._worker, "_worker_load", 0.0), } except Exception as e: logger.error(f"Error getting worker status: {e}") status = { "agent_id": getattr(self._worker.options, "agent_id", "Unknown"), "executor_type": "Unknown", "active_jobs": 0, "connected": False, "worker_id": "unregistered", "register": getattr(self._worker.options, "register", False), "draining": False, "worker_load": 0.0, "error": str(e), } return web.json_response(status) async def _worker_stats(self, request: web.Request) -> web.Response: """Worker statistics endpoint.""" if not self._worker: return web.json_response({"error": "Worker not available"}) try: stats = self._worker.get_stats() return web.json_response(stats) except Exception as e: logger.error(f"Error getting worker stats: {e}") return web.json_response( { "error": str(e), "agent_id": getattr(self._worker.options, "agent_id", "Unknown"), "executor_type": "Unknown", "current_jobs": 0, "max_processes": getattr(self._worker.options, "max_processes", 0), "register": getattr(self._worker.options, "register", False), } ) async def _debug_info(self, request: web.Request) -> web.Response: """Debug information endpoint.""" try: debug_info = { "server": { "host": self._host, "port": self._port, "endpoints": ["/", "/health", "/worker", "/stats", "/debug"], }, "worker": { "available": self._worker is not None, "options": ( { "agent_id": ( getattr(self._worker.options, "agent_id", "Unknown") if self._worker else None ), "executor_type": ( getattr( self._worker.options, "executor_type", "Unknown" ).value if self._worker and hasattr(self._worker.options, "executor_type") else "Unknown" ), "register": ( getattr(self._worker.options, "register", False) if self._worker else None ), "max_processes": ( getattr(self._worker.options, "max_processes", 0) if self._worker else 0 ), } if self._worker else None ), }, } return web.json_response(debug_info) except Exception as e: logger.error(f"Error in debug info endpoint: {e}") return web.json_response( { "error": str(e), "server": { "host": self._host, "port": self._port, "endpoints": ["/", "/health", "/worker", "/stats", "/debug"], }, "worker": { "available": self._worker is not None, "options": None, }, } )HTTP server for VideoSDK agents debugging and monitoring.
Provides endpoints for health checks, worker status, and debugging information.
Instance variables
prop app : aiohttp.web_app.Application-
Expand source code
@property def app(self) -> web.Application: """Get the aiohttp application.""" return self._appGet the aiohttp application.
prop port : int-
Expand source code
@property def port(self) -> int: """Get the port the server is listening on.""" return self._portGet the port the server is listening on.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close the HTTP server.""" async with self._lock: if self._server: self._server.close() await self._server.wait_closed()Close the HTTP server.
def set_worker(self, worker: Any) ‑> None-
Expand source code
def set_worker(self, worker: Any) -> None: """Set the worker instance for status endpoints.""" self._worker = workerSet the worker instance for status endpoints.
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start the HTTP server.""" async with self._lock: # Add routes - matching structure self._app.add_routes([web.get("/", self._handle_dashboard)]) self._app.add_routes([web.get("/debug/worker/", self._worker_debug)]) self._app.add_routes([web.get("/debug/runners/", self._runners_list)]) self._app.add_routes([web.get("/debug/runner/", self._runner_details)]) self._app.add_routes([web.get("/health", self._health_check)]) self._app.add_routes([web.get("/worker", self._worker_status)]) self._app.add_routes([web.get("/stats", self._worker_stats)]) self._app.add_routes([web.get("/debug", self._debug_info)]) self._app.add_routes([web.get("/api/status", self._api_status)]) # Create server handler = self._app.make_handler() self._server = await self._loop.create_server( handler, self._host, self._port ) # Get actual port if using port 0 if self._port == 0: self._port = self._server.sockets[0].getsockname()[1] await self._server.start_serving()Start the HTTP server.
class Tracing (name: str)-
Expand source code
class Tracing: """ Tracing system for VideoSDK agents. Provides performance monitoring, debugging, and metrics collection. """ _graphs: Dict[str, TracingGraph] = {} _handles: Dict[str, "Tracing"] = {} _events: Dict[str, List[TracingEvent]] = defaultdict(list) _kv_data: Dict[str, Dict[str, Any]] = defaultdict(dict) def __init__(self, name: str): self.name = name self._graphs = {} self._events = defaultdict(list) self._kv_data = defaultdict(dict) @classmethod def with_handle(cls, name: str) -> "Tracing": """Get or create a tracing handle.""" if name not in cls._handles: cls._handles[name] = cls(name) return cls._handles[name] @classmethod def add_graph( cls, title: str, x_label: str = "time", y_label: str = "value", x_type: str = "time", y_range: tuple = (0, 100), max_data_points: int = 1000, ) -> TracingGraph: """Add a new tracing graph.""" graph = TracingGraph( title=title, x_label=x_label, y_label=y_label, x_type=x_type, y_range=y_range, max_data_points=max_data_points, ) cls._graphs[title] = graph return graph @classmethod def get_graph(cls, title: str) -> Optional[TracingGraph]: """Get a tracing graph by title.""" return cls._graphs.get(title) def add_point(self, graph_title: str, value: float, label: str = "") -> None: """Add a point to a specific graph.""" graph = self._graphs.get(graph_title) if graph: graph.add_point(value, label) def add_event(self, name: str, data: Optional[Dict[str, Any]] = None) -> None: """Add an event to the tracing system.""" event = TracingEvent(timestamp=time.time(), name=name, data=data) self._events[self.name].append(event) def set_kv(self, key: str, value: Any) -> None: """Set a key-value pair in the tracing data.""" self._kv_data[self.name][key] = value def get_kv(self, key: str) -> Any: """Get a key-value pair from the tracing data.""" return self._kv_data[self.name].get(key) def _export(self) -> Dict[str, Any]: """Export all data for this tracing instance.""" return { "name": self.name, "kv": dict(self._kv_data[self.name]), "events": [ {"timestamp": event.timestamp, "name": event.name, "data": event.data} for event in self._events[self.name] ], "graphs": {title: graph.export() for title, graph in self._graphs.items()}, } @classmethod def export_all(cls) -> Dict[str, Any]: """Export all tracing data.""" return { "global_graphs": { title: graph.export() for title, graph in cls._graphs.items() }, "handles": { name: handle._export() for name, handle in cls._handles.items() }, } @classmethod def export_for_handle(cls, handle_name: str) -> Dict[str, Any]: """Export tracing data for a specific handle.""" handle = cls._handles.get(handle_name) if handle: return handle._export() return {"kv": {}, "events": [], "graphs": []} @classmethod def create_debug_app(cls, worker: Any) -> Any: """Create a debug web application for tracing.""" from aiohttp import web async def tracing_index(request: web.Request) -> web.Response: """Serve the tracing dashboard HTML.""" html_content = """ <!DOCTYPE html> <html> <head> <title>VideoSDK Agents Debug Dashboard</title> <style> body { font-family: Arial, sans-serif; margin: 20px; } .section { margin: 20px 0; padding: 15px; border: 1px solid #ddd; border-radius: 5px; } .graph { margin: 10px 0; padding: 10px; background: #f9f9f9; } .endpoint { margin: 5px 0; } .endpoint a { color: #0066cc; text-decoration: none; } .endpoint a:hover { text-decoration: underline; } </style> </head> <body> <h1>VideoSDK Agents Debug Dashboard</h1> <div class="section"> <h2>Endpoints</h2> <div class="endpoint"><a href="/health">Health Check</a></div> <div class="endpoint"><a href="/worker">Worker Status</a></div> <div class="endpoint"><a href="/stats">Worker Statistics</a></div> <div class="endpoint"><a href="/tracing">Tracing Data</a></div> </div> <div class="section"> <h2>Worker Information</h2> <div id="worker-info">Loading...</div> </div> <div class="section"> <h2>Tracing Graphs</h2> <div id="tracing-graphs">Loading...</div> </div> <script> // Load worker info fetch('/worker') .then(response => response.json()) .then(data => { document.getElementById('worker-info').innerHTML = '<pre>' + JSON.stringify(data, null, 2) + '</pre>'; }); // Load tracing data fetch('/tracing') .then(response => response.json()) .then(data => { document.getElementById('tracing-graphs').innerHTML = '<pre>' + JSON.stringify(data, null, 2) + '</pre>'; }); </script> </body> </html> """ return web.Response(text=html_content, content_type="text/html") async def tracing_data(request: web.Request) -> web.Response: """Serve tracing data as JSON.""" return web.json_response(cls.export_all()) app = web.Application() app.add_routes([web.get("", tracing_index)]) app.add_routes([web.get("/", tracing_index)]) app.add_routes([web.get("/tracing", tracing_data)]) return appTracing system for VideoSDK agents.
Provides performance monitoring, debugging, and metrics collection.
Static methods
def add_graph(title: str,
x_label: str = 'time',
y_label: str = 'value',
x_type: str = 'time',
y_range: tuple = (0, 100),
max_data_points: int = 1000) ‑> TracingGraph-
Add a new tracing graph.
def create_debug_app(worker: Any) ‑> Any-
Create a debug web application for tracing.
def export_all() ‑> Dict[str, Any]-
Export all tracing data.
def export_for_handle(handle_name: str) ‑> Dict[str, Any]-
Export tracing data for a specific handle.
def get_graph(title: str) ‑> TracingGraph | None-
Get a tracing graph by title.
def with_handle(name: str) ‑> Tracing-
Get or create a tracing handle.
Methods
def add_event(self, name: str, data: Dict[str, Any] | None = None) ‑> None-
Expand source code
def add_event(self, name: str, data: Optional[Dict[str, Any]] = None) -> None: """Add an event to the tracing system.""" event = TracingEvent(timestamp=time.time(), name=name, data=data) self._events[self.name].append(event)Add an event to the tracing system.
def add_point(self, graph_title: str, value: float, label: str = '') ‑> None-
Expand source code
def add_point(self, graph_title: str, value: float, label: str = "") -> None: """Add a point to a specific graph.""" graph = self._graphs.get(graph_title) if graph: graph.add_point(value, label)Add a point to a specific graph.
def get_kv(self, key: str) ‑> Any-
Expand source code
def get_kv(self, key: str) -> Any: """Get a key-value pair from the tracing data.""" return self._kv_data[self.name].get(key)Get a key-value pair from the tracing data.
def set_kv(self, key: str, value: Any) ‑> None-
Expand source code
def set_kv(self, key: str, value: Any) -> None: """Set a key-value pair in the tracing data.""" self._kv_data[self.name][key] = valueSet a key-value pair in the tracing data.