Module agents.cli
Functions
def build_docker_image(main_file: pathlib.Path,
requirement_path: pathlib.Path,
worker_id: str,
save_tar: bool = False) ‑> str-
Expand source code
def build_docker_image( main_file: Path, requirement_path: Path, worker_id: str, save_tar: bool = False ) -> str: """Build Docker image for the worker and return the path to the saved image or image name.""" try: # Create Dockerfile in current directory if it doesn't exist dockerfile_path = Path.cwd() / "Dockerfile" if not dockerfile_path.exists(): # Get the relative path of the main file from the current directory main_file_rel = main_file.relative_to(Path.cwd()) # Check if requirements.txt exists requirements_exist = requirement_path.exists() requirements_install = ( """COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt""" if requirements_exist else "# No requirements.txt found, skipping dependency installation" ) dockerfile_content = f"""FROM --platform=linux/arm64 python:3.11-slim WORKDIR /app # Copy requirements first for better caching {requirements_install} # Copy deployment code COPY . . # Run the deployment CMD ["python", "{main_file_rel}"] """ with open(dockerfile_path, "w") as f: f.write(dockerfile_content) console.print(f"[cyan]✓[/cyan] Created Dockerfile in {dockerfile_path}") # Build Docker image image_name = f"videosdk-worker-{worker_id}" build_cmd = [ "docker", "build", "-t", image_name, "--platform", "linux/arm64", "--build-arg", "BUILDPLATFORM=linux/arm64", ".", ] try: result = subprocess.run( build_cmd, capture_output=True, text=True, check=True ) except subprocess.CalledProcessError as e: handle_docker_error(e) if save_tar: # Create a temporary directory for the tar file import tempfile temp_dir = tempfile.mkdtemp() image_path = Path(temp_dir) / f"{image_name}.tar" try: save_cmd = ["docker", "save", "-o", str(image_path), image_name] subprocess.run(save_cmd, check=True, capture_output=True) except subprocess.CalledProcessError as e: handle_docker_error(e) return str(image_path) else: return image_name except Exception as e: if isinstance(e, VideoSDKError): raise e raise DockerError(f"Failed to build Docker image: {str(e)}")Build Docker image for the worker and return the path to the saved image or image name.
def cleanup_container(container_name)-
Expand source code
def cleanup_container(container_name): """Stop and remove a Docker container.""" try: # Check if container exists result = subprocess.run( [ "docker", "ps", "-a", "--filter", f"name={container_name}", "--format", "{{.Names}}", ], capture_output=True, text=True, check=True, ) if container_name not in result.stdout: return # Container doesn't exist # Stop container if running try: subprocess.run( ["docker", "stop", container_name], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10, ) console.print( f"[cyan]✓[/cyan] Stopped container [cyan]{container_name}[/cyan]" ) except subprocess.TimeoutExpired: console.print( f"[yellow]⚠[/yellow] Container [cyan]{container_name}[/cyan] stop timed out" ) except subprocess.CalledProcessError: console.print( f"[yellow]⚠[/yellow] Container [cyan]{container_name}[/cyan] was not running" ) # Remove container try: subprocess.run( ["docker", "rm", container_name], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=10, ) console.print( f"[cyan]✓[/cyan] Removed container [cyan]{container_name}[/cyan]" ) except subprocess.TimeoutExpired: console.print( f"[yellow]⚠[/yellow] Container [cyan]{container_name}[/cyan] removal timed out" ) except subprocess.CalledProcessError: console.print( f"[yellow]⚠[/yellow] Container [cyan]{container_name}[/cyan] could not be removed" ) except Exception as e: console.print( f"[red]✗[/red] Error cleaning up container [cyan]{container_name}[/cyan]: {str(e)}" )Stop and remove a Docker container.
def create_dockerfile(directory: pathlib.Path, entry_point: str = 'main.py') ‑> pathlib.Path-
Expand source code
def create_dockerfile(directory: Path, entry_point: str = "main.py") -> Path: """Create a Dockerfile in the specified directory if it doesn't exist.""" dockerfile_path = directory / "Dockerfile" if not dockerfile_path.exists(): # Check if requirements.txt exists requirements_exist = (directory / "requirements.txt").exists() requirements_install = ( """COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt""" if requirements_exist else "# No requirements.txt found, skipping dependency installation" ) dockerfile_content = f"""FROM --platform=linux/arm64 python:3.11-slim WORKDIR /app # Copy requirements first for better caching {requirements_install} # Copy deployment code COPY . . # Run the deployment CMD ["python", "{entry_point}"] """ try: with open(dockerfile_path, "w") as f: f.write(dockerfile_content) console.print(f"[cyan]✓[/cyan] Created Dockerfile in {directory}") except Exception as e: raise DockerError(f"Failed to create Dockerfile: {str(e)}") return dockerfile_pathCreate a Dockerfile in the specified directory if it doesn't exist.
def create_yaml_interactive() ‑> dict-
Expand source code
def create_yaml_interactive() -> dict: """Create videosdk.yaml interactively by asking user for details.""" console.print( Panel.fit( "[yellow]No videosdk.yaml found in current directory[/yellow]\n\n" "Let's create one! I'll guide you through the process.", title="Configuration Setup", border_style="yellow", ) ) # Initialize config with defaults config = { "version": "1.0", "deployment": { "id": "", "entry": {"path": ""}, "signaling_base_url": "api.videosdk.live", }, "env": {"path": "./.env"}, "secrets": {"VIDEOSDK_AUTH_TOKEN": ""}, "deploy": {"cloud": True}, } # Ask for worker details console.print("\n[bold cyan]Step 1: Deployment Configuration[/bold cyan]") console.print("[dim]This section defines your deployment's basic settings.[/dim]") console.print("[dim]Deployment ID must be 3-64 characters long and can only contain letters, numbers, hyphens, and underscores.[/dim]") config["deployment"]["id"] = click.prompt("Deployment ID") validate_worker_id(config["deployment"]["id"]) # Ask for entry point default_entry = "src/main.py" console.print("[dim]Specify the path to your main Python file.[/dim]") entry_path = click.prompt( "Path to main Python file", default=default_entry ) config["deployment"]["entry"]["path"] = entry_path # Ask for environment file path console.print("\n[bold cyan]Step 2: Environment Configuration[/bold cyan]") console.print("[dim]Configure your deployment's environment file.[/dim]") console.print("[dim]Specify the path to your .env file (optional).[/dim]") env_path = click.prompt("Path to environment file", default="./.env") config["env"]["path"] = env_path # Validate environment file if it exists if Path(env_path).exists(): validate_env_file(Path(env_path)) # Ask for secrets console.print("\n[bold cyan]Step 3: Secrets Configuration[/bold cyan]") console.print("[dim]Configure your deployment's secrets.[/dim]") console.print("[dim]Enter your VideoSDK authentication token.[/dim]") secrets = config["secrets"] secrets["VIDEOSDK_AUTH_TOKEN"] = click.prompt("VideoSDK Auth Token") # Ask for deployment settings console.print("\n[bold cyan]Step 4: Deployment Configuration[/bold cyan]") console.print("[dim]Configure your deployment settings.[/dim]") console.print("[dim]Choose whether to enable cloud deployment.[/dim]") deploy = config["deploy"] deploy["cloud"] = click.confirm("Enable cloud deployment?", default=True) # Save the configuration config_path = Path.cwd() / "videosdk.yaml" try: with open(config_path, "w") as f: yaml.dump(config, f, default_flow_style=False, sort_keys=False) console.print( Panel.fit( f"[green]Success![/green] Created videosdk.yaml at:\n" f"[cyan]{config_path}[/cyan]\n\n" f"You can now run your worker with:\n" f"[green]videosdk run[/green]", title="Configuration Complete", border_style="green", ) ) return config except Exception as e: raise ConfigurationError(f"Failed to create videosdk.yaml: {str(e)}")Create videosdk.yaml interactively by asking user for details.
def format_log_line(line: str, color: str = None) ‑> str-
Expand source code
def format_log_line(line: str, color: str = None) -> str: """Format a log line with appropriate colors and styling.""" # Remove ANSI color codes from the line import re line = re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", line) # Try to detect log level and format accordingly line_lower = line.lower() if any( level in line_lower for level in ["error", "exception", "failed", "failure"] ): return f"[red]{line}[/red]" elif any(level in line_lower for level in ["warning", "warn"]): return f"[yellow]{line}[/yellow]" elif any(level in line_lower for level in ["info", "information"]): return f"[cyan]{line}[/cyan]" elif any(level in line_lower for level in ["debug"]): return f"[dim]{line}[/dim]" elif color: return f"[{color}]{line}[/{color}]" else: return lineFormat a log line with appropriate colors and styling.
def get_headers(config: dict) ‑> dict-
Expand source code
def get_headers(config: dict) -> dict: """Get headers for VideoSDK API requests.""" auth_token = config.get("secrets", {}).get("VIDEOSDK_AUTH_TOKEN") if not auth_token: raise ValidationError( "VIDEOSDK_AUTH_TOKEN is required in videosdk.yaml secrets section" ) return {"Authorization": f"{auth_token}", "Content-Type": "application/json"}Get headers for VideoSDK API requests.
def handle_api_error(response: requests.models.Response) ‑> None-
Expand source code
def handle_api_error(response: requests.Response) -> None: """Handle API errors with user-friendly messages.""" try: error_data = response.json() error_message = error_data.get("message", "Unknown error occurred") error_details = error_data.get("details", {}) error_code = error_data.get("code", "UNKNOWN_ERROR") except json.JSONDecodeError: error_message = response.text error_details = {} error_code = "INVALID_RESPONSE" if response.status_code == 401: raise APIError( "Authentication failed.\n" "Please check your VIDEOSDK_AUTH_TOKEN.\n" "You can get your token from the VideoSDK dashboard.\n\n" f"Server Error: {error_message}" ) elif response.status_code == 404: raise APIError( "Resource not found.\n" "Please check the deployment ID and try again.\n" "You can verify your deployment ID in videosdk.yaml\n\n" f"Server Error: {error_message}" ) elif response.status_code == 403: raise APIError( "Access denied.\n" "Please check your permissions and ensure your VIDEOSDK_AUTH_TOKEN is valid.\n\n" f"Server Error: {error_message}" ) elif response.status_code >= 500: raise APIError( "Server error.\n" "Please try again later or contact VideoSDK support if the issue persists.\n\n" f"Server Error Code: {error_code}\n" f"Server Error Message: {error_message}\n" f"Error Details: {json.dumps(error_details, indent=2) if error_details else 'None'}" ) elif response.status_code >= 400: raise APIError( f"API Error:\n" f"Status Code: {response.status_code}\n" f"Error Code: {error_code}\n" f"Error Message: {error_message}\n" f"Error Details: {json.dumps(error_details, indent=2) if error_details else 'None'}" ) else: raise APIError(f"Unexpected API response: {error_message}")Handle API errors with user-friendly messages.
def handle_docker_error(error: subprocess.CalledProcessError) ‑> None-
Expand source code
def handle_docker_error(error: subprocess.CalledProcessError) -> None: """Handle Docker command errors with user-friendly messages.""" error_msg = error.stderr.decode() if error.stderr else str(error) if "permission denied" in error_msg.lower(): raise DockerError( "Docker permission denied.\n" "Please ensure you have the necessary permissions to run Docker commands.\n" "You might need to run 'sudo usermod -aG docker $USER' and log out and back in." ) elif "no such file or directory" in error_msg.lower(): raise DockerError( "Docker command not found.\n" "Please ensure Docker is installed and in your PATH.\n" "Visit https://docs.docker.com/get-docker/ for installation instructions." ) elif "port is already allocated" in error_msg.lower(): raise DockerError( "Port is already in use.\n" "Please free up the port or use a different one.\n" "You can find the process using the port with 'lsof -i :<port>'" ) elif "image not found" in error_msg.lower(): raise DockerError( "Docker image not found.\n" "Please ensure the image exists and try rebuilding with 'videosdk run'" ) elif "no space left on device" in error_msg.lower(): raise DockerError( "No space left on device.\n" "Please free up some disk space and try again.\n" "You can use 'docker system prune' to clean up unused Docker resources." ) else: raise DockerError(f"Docker operation failed: {error_msg}")Handle Docker command errors with user-friendly messages.
def load_config() ‑> dict-
Expand source code
def load_config() -> dict: """Load configuration from videosdk.yaml file.""" config_path = Path.cwd() / "videosdk.yaml" if not config_path.exists(): # Create config interactively return create_yaml_interactive() try: with open(config_path, "r") as f: content = f.read().strip() if not content: raise ConfigurationError( "videosdk.yaml is empty.\n" "Please add configuration or run 'videosdk run' to create it interactively." ) try: config = yaml.safe_load(content) except yaml.YAMLError as e: raise ConfigurationError( f"Invalid YAML in videosdk.yaml:\n{str(e)}\n" "Please check the file format and try again." ) # Validate that config is a dictionary if not isinstance(config, dict): raise ConfigurationError( f"Invalid configuration format in videosdk.yaml.\n" f"Expected a dictionary, got {type(config).__name__}.\n" "Please check the file format and try again." ) # Validate required fields if not config: raise ConfigurationError( "videosdk.yaml is empty.\n" "Please add configuration or run 'videosdk run' to create it interactively." ) # Validate version version = config.get("version") if version != "1.0": raise ConfigurationError( f"Unsupported configuration version.\n" f"Expected version 1.0, found: {version}\n" "Please update your configuration to use version 1.0." ) # Validate worker section deployment = config.get('deployment') if not deployment: raise ConfigurationError( "Missing 'deployment' section in videosdk.yaml.\n" "Please add a deployment section with required fields:\n" "deployment:\n" " id: your-deployment-id\n" " entry:\n" " path: path/to/main.py" ) if not isinstance(deployment, dict): raise ConfigurationError( f"Invalid deployment section format.\n" f"Expected a dictionary, got {type(deployment).__name__}.\n" "Please check the deployment section format in videosdk.yaml." ) worker_id = deployment.get('id') if not worker_id: raise ConfigurationError( "Missing 'deployment.id' in videosdk.yaml.\n" "Please add a unique identifier for your deployment:\n" "deployment:\n" " id: your-deployment-id" ) try: validate_worker_id(worker_id) except ValidationError as e: raise ConfigurationError(f"Invalid deployment ID in videosdk.yaml: {str(e)}") entry = deployment.get('entry', {}) if not isinstance(entry, dict): raise ConfigurationError( f"Invalid entry section format.\n" f"Expected a dictionary, got {type(entry).__name__}.\n" "Please check the entry section format in videosdk.yaml." ) entry_path = entry.get("path") if not entry_path: raise ConfigurationError( "Missing 'deployment.entry.path' in videosdk.yaml.\n" "Please specify the path to your main Python file:\n" "deployment:\n" " entry:\n" " path: path/to/main.py" ) # Validate signaling base URL signaling_base_url = deployment.get("signaling_base_url", "api.videosdk.live") if not signaling_base_url: raise ConfigurationError( "Missing 'worker.signaling_base_url' in videosdk.yaml.\n" "Please specify the VideoSDK signaling base URL:\n" "worker:\n" " signaling_base_url: api.videosdk.live" ) # Validate env section env = config.get("env", {}) if not isinstance(env, dict): raise ConfigurationError( f"Invalid env section format.\n" f"Expected a dictionary, got {type(env).__name__}.\n" "Please check the env section format in videosdk.yaml." ) env.setdefault("path", "./.env") # Validate environment file if it exists env_path = Path(env["path"]) if env_path.exists(): validate_env_file(env_path) # Validate secrets section secrets = config.get("secrets", {}) if not isinstance(secrets, dict): raise ConfigurationError( f"Invalid secrets section format.\n" f"Expected a dictionary, got {type(secrets).__name__}.\n" "Please check the secrets section format in videosdk.yaml." ) if not secrets.get("VIDEOSDK_AUTH_TOKEN"): raise ConfigurationError( "Missing 'VIDEOSDK_AUTH_TOKEN' in secrets section.\n" "Please add your VideoSDK authentication token:\n" "secrets:\n" " VIDEOSDK_AUTH_TOKEN: your-token" ) # Validate deploy section deploy = config.get("deploy", {}) if not isinstance(deploy, dict): raise ConfigurationError( f"Invalid deploy section format.\n" f"Expected a dictionary, got {type(deploy).__name__}.\n" "Please check the deploy section format in videosdk.yaml." ) deploy.setdefault("cloud", True) return config except ConfigurationError as e: raise e except Exception as e: raise ConfigurationError( f"Error reading videosdk.yaml:\n{str(e)}\n" "Please check the file format and try again.\n" "You can run 'videosdk run' to create a new configuration interactively." )Load configuration from videosdk.yaml file.
def main()-
Expand source code
def main(): """Main entry point for the CLI.""" try: cli() except VideoSDKError as e: console.print(f"\n[red]Error:[/red] {str(e)}") sys.exit(1) except KeyboardInterrupt: console.print("\n[yellow]Operation cancelled by user[/yellow]") sys.exit(130) except Exception as e: console.print(f"\n[red]Unexpected error:[/red] {str(e)}") console.print( "\n[yellow]If this error persists, please contact VideoSDK support.[/yellow]" ) sys.exit(1)Main entry point for the CLI.
def print_welcome()-
Expand source code
def print_welcome(): """Print welcome message with instructions.""" console.print( Panel.fit( "[bold cyan]Welcome to VideoSDK CLI![/bold cyan]\n\n" "[white]Available Commands:[/white]\n" "• [green]videosdk run[/green] - Run your worker locally\n" "• [green]videosdk deploy[/green] - Deploy your worker to VideoSDK Cloud\n\n" "[yellow]Note:[/yellow] Configuration is managed through videosdk.yaml", title="VideoSDK CLI", border_style="cyan", ) )Print welcome message with instructions.
def read_output(pipe, color=None)-
Expand source code
def read_output(pipe, color=None): """Read and format output from a pipe with appropriate colors.""" try: for line in iter(pipe.readline, ""): if line: line = line.strip() if line: # Only print non-empty lines formatted_line = format_log_line(line, color) console.print(formatted_line) except Exception as e: console.print(f"[red]Error reading output: {str(e)}[/red]")Read and format output from a pipe with appropriate colors.
def validate_build_files(main_file: pathlib.Path, requirement_path: pathlib.Path) ‑> None-
Expand source code
def validate_build_files(main_file: Path, requirement_path: Path) -> None: """Validate that all required files exist and are valid.""" try: # Check main.py if not main_file.exists(): raise FileError( f"Could not find your main file at {main_file}\n" f"Please ensure the path in videosdk.yaml is correct." ) if not main_file.name == "main.py": raise FileError( "Your main file must be named main.py\n" f"Found: {main_file.name}" ) # Check requirements.txt (optional) if requirement_path.exists(): if not requirement_path.name == "requirements.txt": raise FileError( "Your requirements file must be named requirements.txt\n" f"Found: {requirement_path.name}" ) # Validate requirements.txt content if it exists try: with open(requirement_path, "r") as f: requirements = f.read().strip() if not requirements: raise FileError( "Your requirements.txt file is empty.\n" "Please add your Python dependencies to requirements.txt" ) except Exception as e: raise FileError(f"Could not read requirements.txt: {str(e)}") except FileError as e: raise e except Exception as e: raise FileError(f"Unexpected error during file validation: {str(e)}")Validate that all required files exist and are valid.
def validate_env_file(env_path: pathlib.Path) ‑> None-
Expand source code
def validate_env_file(env_path: Path) -> None: """Validate the environment file.""" if not env_path.exists(): return try: with open(env_path, "r") as f: content = f.read().strip() if not content: raise ValidationError( f"Environment file {env_path} is empty.\n" "Please add your environment variables or remove the file." ) # Basic validation of .env file format for line in content.split("\n"): line = line.strip() if line and not line.startswith("#"): if "=" not in line: raise ValidationError( f"Invalid environment variable format in {env_path}:\n" f"Line: {line}\n" "Expected format: KEY=VALUE" ) except Exception as e: raise ValidationError(f"Error reading environment file {env_path}: {str(e)}")Validate the environment file.
def validate_environment() ‑> None-
Expand source code
def validate_environment() -> None: """Validate the environment setup.""" # Check Docker installation try: subprocess.run(["docker", "version"], capture_output=True, check=True) except subprocess.CalledProcessError: raise ValidationError( "Docker is not properly installed or not running.\n" "Please ensure Docker is installed and the Docker daemon is running.\n" "Visit https://docs.docker.com/get-docker/ for installation instructions." ) except FileNotFoundError: raise ValidationError( "Docker is not installed.\n" "Please install Docker first.\n" "Visit https://docs.docker.com/get-docker/ for installation instructions." )Validate the environment setup.
def validate_worker_id(worker_id: str) ‑> None-
Expand source code
def validate_worker_id(worker_id: str) -> None: """Validate deployment ID format.""" if not worker_id: raise ValidationError("Deployment ID cannot be empty") # Check for valid characters (alphanumeric, hyphen, underscore) if not all(c.isalnum() or c in "-_" for c in worker_id): raise ValidationError( "Deployment ID can only contain letters, numbers, hyphens, and underscores" ) # Check length (between 3 and 64 characters) if len(worker_id) < 3 or len(worker_id) > 64: raise ValidationError( "Deployment ID must be between 3 and 64 characters long" )Validate deployment ID format.
Classes
class APIError (*args, **kwargs)-
Expand source code
class APIError(VideoSDKError): """Exception for API-related errors.""" passException for API-related errors.
Ancestors
- VideoSDKError
- builtins.Exception
- builtins.BaseException
class ConfigurationError (*args, **kwargs)-
Expand source code
class ConfigurationError(VideoSDKError): """Exception for configuration-related errors.""" passException for configuration-related errors.
Ancestors
- VideoSDKError
- builtins.Exception
- builtins.BaseException
class DockerError (*args, **kwargs)-
Expand source code
class DockerError(VideoSDKError): """Exception for Docker-related errors.""" passException for Docker-related errors.
Ancestors
- VideoSDKError
- builtins.Exception
- builtins.BaseException
class FileError (*args, **kwargs)-
Expand source code
class FileError(VideoSDKError): """Exception for file-related errors.""" passException for file-related errors.
Ancestors
- VideoSDKError
- builtins.Exception
- builtins.BaseException
class ValidationError (*args, **kwargs)-
Expand source code
class ValidationError(VideoSDKError): """Exception for validation errors.""" passException for validation errors.
Ancestors
- VideoSDKError
- builtins.Exception
- builtins.BaseException
class VideoSDKError (*args, **kwargs)-
Expand source code
class VideoSDKError(Exception): """Base exception for VideoSDK CLI errors.""" passBase exception for VideoSDK CLI errors.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses