From e7b4854f636af994be73189c645064bacde0d209 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:29:55 +0000 Subject: [PATCH] feat: integrate MCP (Model Context Protocol) for agent compatibility Co-authored-by: davidmonterocrespo24 <47928504+davidmonterocrespo24@users.noreply.github.com> --- backend/app/mcp/__init__.py | 3 + backend/app/mcp/server.py | 406 +++++++++++++++++++++++++++++++ backend/app/mcp/wokwi.py | 279 +++++++++++++++++++++ backend/mcp_server.py | 31 +++ backend/mcp_sse_server.py | 46 ++++ backend/requirements.txt | 1 + backend/tests/__init__.py | 0 backend/tests/test_mcp_tools.py | 416 ++++++++++++++++++++++++++++++++ doc/MCP.md | 300 +++++++++++++++++++++++ 9 files changed, 1482 insertions(+) create mode 100644 backend/app/mcp/__init__.py create mode 100644 backend/app/mcp/server.py create mode 100644 backend/app/mcp/wokwi.py create mode 100644 backend/mcp_server.py create mode 100644 backend/mcp_sse_server.py create mode 100644 backend/tests/__init__.py create mode 100644 backend/tests/test_mcp_tools.py create mode 100644 doc/MCP.md diff --git a/backend/app/mcp/__init__.py b/backend/app/mcp/__init__.py new file mode 100644 index 0000000..cadd4cc --- /dev/null +++ b/backend/app/mcp/__init__.py @@ -0,0 +1,3 @@ +# MCP (Model Context Protocol) server for Velxio +# Exposes tools for circuit authoring, Wokwi JSON import/export, +# code generation, and compilation. diff --git a/backend/app/mcp/server.py b/backend/app/mcp/server.py new file mode 100644 index 0000000..688955c --- /dev/null +++ b/backend/app/mcp/server.py @@ -0,0 +1,406 @@ +""" +Velxio MCP Server + +Exposes the following tools to MCP-compatible agents (e.g. Claude): + + - compile_project Compile Arduino sketch files → hex / binary + - run_project Compile and return simulation-ready artifacts + - import_wokwi_json Parse a Wokwi diagram.json → Velxio circuit + - export_wokwi_json Serialise a Velxio circuit → Wokwi diagram.json + - create_circuit Create a new circuit definition + - update_circuit Merge changes into an existing circuit definition + - generate_code_files Generate starter Arduino code from a circuit + +Transport: + - stdio — run `python mcp_server.py` for Claude Desktop / CLI agents + - SSE — mounted at /mcp in the FastAPI app for HTTP-based agents +""" + +from __future__ import annotations + +import json +import sys +from typing import Annotated, Any + +from mcp.server.fastmcp import FastMCP + +from app.mcp.wokwi import ( + format_wokwi_diagram, + generate_arduino_sketch, + parse_wokwi_diagram, +) +from app.services.arduino_cli import ArduinoCLIService + +# --------------------------------------------------------------------------- +# Server setup +# --------------------------------------------------------------------------- + +mcp = FastMCP( + name="velxio", + instructions=( + "Velxio MCP server — create circuits, import/export Wokwi JSON, " + "generate Arduino code, and compile projects." + ), +) + +_arduino = ArduinoCLIService() + +# --------------------------------------------------------------------------- +# compile_project +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def compile_project( + files: Annotated[ + list[dict[str, str]], + "List of source files. Each item must have 'name' (filename, e.g. 'sketch.ino') " + "and 'content' (file text).", + ], + board: Annotated[ + str, + "Arduino board FQBN, e.g. 'arduino:avr:uno' or 'rp2040:rp2040:rpipico'. " + "Defaults to 'arduino:avr:uno'.", + ] = "arduino:avr:uno", +) -> dict[str, Any]: + """ + Compile one or more Arduino sketch files and return the compiled artifact. + + Returns a dict with: + - success (bool) + - hex_content (str | null) Intel HEX for AVR boards + - binary_content (str | null) Base-64 .bin/.uf2 for RP2040 + - binary_type (str | null) 'bin' or 'uf2' + - stdout (str) + - stderr (str) + - error (str | null) + """ + for f in files: + if "name" not in f or "content" not in f: + return { + "success": False, + "error": "Each file entry must have 'name' and 'content' keys.", + "stdout": "", + "stderr": "", + } + + try: + result = await _arduino.compile(files, board) + return result + except Exception as exc: # pragma: no cover + return { + "success": False, + "error": str(exc), + "stdout": "", + "stderr": "", + } + + +# --------------------------------------------------------------------------- +# run_project +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def run_project( + files: Annotated[ + list[dict[str, str]], + "List of source files (same format as compile_project).", + ], + board: Annotated[str, "Board FQBN (default: 'arduino:avr:uno')."] = "arduino:avr:uno", +) -> dict[str, Any]: + """ + Compile the project and return simulation-ready artifacts. + + The Velxio frontend can load the returned hex_content / binary_content + directly into its AVR / RP2040 emulator. Actual execution happens + client-side in the browser. + + Returns the same payload as compile_project plus a 'simulation_ready' flag. + """ + result = await compile_project(files=files, board=board) + result["simulation_ready"] = result.get("success", False) + return result + + +# --------------------------------------------------------------------------- +# import_wokwi_json +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def import_wokwi_json( + diagram_json: Annotated[ + str, + "Wokwi diagram.json content as a JSON string. " + "Must contain at minimum a 'parts' array.", + ], +) -> dict[str, Any]: + """ + Parse a Wokwi diagram.json payload and return a Velxio circuit object. + + The returned circuit can be passed directly to export_wokwi_json, + generate_code_files, compile_project, or saved as a Velxio project. + + Returns: + - board_fqbn (str) Detected Arduino board FQBN + - components (list) List of component objects + - connections (list) List of connection objects + - version (int) + """ + try: + diagram = json.loads(diagram_json) + except json.JSONDecodeError as exc: + return {"error": f"Invalid JSON: {exc}"} + + if not isinstance(diagram, dict): + return {"error": "diagram_json must be a JSON object."} + + return parse_wokwi_diagram(diagram) + + +# --------------------------------------------------------------------------- +# export_wokwi_json +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def export_wokwi_json( + circuit: Annotated[ + dict[str, Any], + "Velxio circuit object with 'components', 'connections', and 'board_fqbn'.", + ], + author: Annotated[str, "Author name to embed in the diagram (default: 'velxio')."] = "velxio", +) -> dict[str, Any]: + """ + Convert a Velxio circuit object into a Wokwi diagram.json payload. + + The returned payload is compatible with the Wokwi simulator and can be + imported using the Wokwi zip import feature in Velxio. + + Returns the Wokwi diagram dict (version, author, editor, parts, connections). + """ + if not isinstance(circuit, dict): + return {"error": "circuit must be a JSON object."} + + return format_wokwi_diagram(circuit, author=author) + + +# --------------------------------------------------------------------------- +# create_circuit +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def create_circuit( + board_fqbn: Annotated[ + str, + "Arduino board FQBN. e.g. 'arduino:avr:uno', 'rp2040:rp2040:rpipico'.", + ] = "arduino:avr:uno", + components: Annotated[ + list[dict[str, Any]] | None, + "List of component objects. Each item may have: " + "id (str), type (str, Wokwi element type), left (number), top (number), " + "rotate (number), attrs (object).", + ] = None, + connections: Annotated[ + list[dict[str, Any]] | None, + "List of connection objects. Each item may have: " + "from_part (str), from_pin (str), to_part (str), to_pin (str), color (str).", + ] = None, +) -> dict[str, Any]: + """ + Create a new Velxio circuit definition. + + Example component types: wokwi-led, wokwi-pushbutton, wokwi-resistor, + wokwi-buzzer, wokwi-servo, wokwi-lcd1602. + + Example connection: + { "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", + "color": "green" } + + Returns the new circuit object (board_fqbn, components, connections, version). + """ + components_list = components if components is not None else [] + connections_list = connections if connections is not None else [] + + # Normalise components + normalised_components: list[dict[str, Any]] = [] + for i, comp in enumerate(components_list): + normalised_components.append( + { + "id": comp.get("id", f"comp{i}"), + "type": comp.get("type", ""), + "left": float(comp.get("left", 0)), + "top": float(comp.get("top", 0)), + "rotate": int(comp.get("rotate", 0)), + "attrs": dict(comp.get("attrs", {})), + } + ) + + # Normalise connections + normalised_connections: list[dict[str, Any]] = [] + for conn in connections_list: + normalised_connections.append( + { + "from_part": conn.get("from_part", ""), + "from_pin": conn.get("from_pin", ""), + "to_part": conn.get("to_part", ""), + "to_pin": conn.get("to_pin", ""), + "color": conn.get("color", "green"), + } + ) + + return { + "board_fqbn": board_fqbn, + "components": normalised_components, + "connections": normalised_connections, + "version": 1, + } + + +# --------------------------------------------------------------------------- +# update_circuit +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def update_circuit( + circuit: Annotated[ + dict[str, Any], + "Existing Velxio circuit object to update.", + ], + add_components: Annotated[ + list[dict[str, Any]] | None, + "Components to add. Merged after existing components.", + ] = None, + remove_component_ids: Annotated[ + list[str] | None, + "IDs of components to remove.", + ] = None, + add_connections: Annotated[ + list[dict[str, Any]] | None, + "Connections to add.", + ] = None, + remove_connections: Annotated[ + list[dict[str, Any]] | None, + "Connections to remove (matched by from_part+from_pin+to_part+to_pin).", + ] = None, + board_fqbn: Annotated[ + str | None, + "If provided, replaces the circuit board_fqbn.", + ] = None, +) -> dict[str, Any]: + """ + Merge changes into an existing Velxio circuit definition. + + Supports adding/removing components and connections, and changing the board. + + Returns the updated circuit object. + """ + if not isinstance(circuit, dict): + return {"error": "circuit must be a JSON object."} + + import copy + + updated = copy.deepcopy(circuit) + + if board_fqbn is not None: + updated["board_fqbn"] = board_fqbn + + # Remove components + if remove_component_ids: + remove_set = set(remove_component_ids) + updated["components"] = [ + c for c in updated.get("components", []) if c.get("id") not in remove_set + ] + + # Add components + existing_ids = {c.get("id") for c in updated.get("components", [])} + for i, comp in enumerate(add_components or []): + comp_id = comp.get("id", f"comp_new_{i}") + if comp_id in existing_ids: + comp_id = f"{comp_id}_new" + updated.setdefault("components", []).append( + { + "id": comp_id, + "type": comp.get("type", ""), + "left": float(comp.get("left", 0)), + "top": float(comp.get("top", 0)), + "rotate": int(comp.get("rotate", 0)), + "attrs": dict(comp.get("attrs", {})), + } + ) + + # Remove connections (exact match) + if remove_connections: + def _conn_key(c: dict[str, Any]) -> tuple[str, str, str, str]: + return ( + c.get("from_part", ""), + c.get("from_pin", ""), + c.get("to_part", ""), + c.get("to_pin", ""), + ) + + remove_keys = {_conn_key(c) for c in remove_connections} + updated["connections"] = [ + c for c in updated.get("connections", []) if _conn_key(c) not in remove_keys + ] + + # Add connections + for conn in (add_connections or []): + updated.setdefault("connections", []).append( + { + "from_part": conn.get("from_part", ""), + "from_pin": conn.get("from_pin", ""), + "to_part": conn.get("to_part", ""), + "to_pin": conn.get("to_pin", ""), + "color": conn.get("color", "green"), + } + ) + + return updated + + +# --------------------------------------------------------------------------- +# generate_code_files +# --------------------------------------------------------------------------- + + +@mcp.tool() +async def generate_code_files( + circuit: Annotated[ + dict[str, Any], + "Velxio circuit object (from create_circuit or import_wokwi_json).", + ], + sketch_name: Annotated[ + str, + "Base name for the generated sketch file (without extension).", + ] = "sketch", + extra_instructions: Annotated[ + str, + "Optional extra instructions or comments to embed in the sketch.", + ] = "", +) -> dict[str, Any]: + """ + Generate starter Arduino code files for the given circuit. + + Returns: + - files: list of { "name": str, "content": str } — ready for compile_project + - board_fqbn: str — detected board FQBN + """ + if not isinstance(circuit, dict): + return {"error": "circuit must be a JSON object."} + + sketch_content = generate_arduino_sketch(circuit, sketch_name=sketch_name) + + if extra_instructions: + header = f"// {extra_instructions}\n" + sketch_content = header + sketch_content + + board_fqbn: str = circuit.get("board_fqbn", "arduino:avr:uno") + + return { + "files": [{"name": f"{sketch_name}.ino", "content": sketch_content}], + "board_fqbn": board_fqbn, + } diff --git a/backend/app/mcp/wokwi.py b/backend/app/mcp/wokwi.py new file mode 100644 index 0000000..ac448f7 --- /dev/null +++ b/backend/app/mcp/wokwi.py @@ -0,0 +1,279 @@ +""" +Wokwi diagram.json parse/format utilities (Python port of frontend/src/utils/wokwiZip.ts). +Handles conversion between Wokwi diagram format and the Velxio internal circuit format. +""" + +from __future__ import annotations + +from typing import Any + +# --------------------------------------------------------------------------- +# Type aliases / data models (plain dicts for simplicity) +# --------------------------------------------------------------------------- + +# Wokwi diagram.json structure: +# { +# "version": 1, +# "author": "...", +# "editor": "wokwi", +# "parts": [ { "type": "...", "id": "...", "top": 0, "left": 0, "attrs": {} } ], +# "connections": [ ["partId:pinName", "partId:pinName", "color", []] ] +# } + +# --------------------------------------------------------------------------- +# Board type mappings +# --------------------------------------------------------------------------- + +WOKWI_TO_VELXIO_BOARD: dict[str, str] = { + "wokwi-arduino-uno": "arduino:avr:uno", + "wokwi-arduino-mega": "arduino:avr:mega", + "wokwi-arduino-nano": "arduino:avr:nano", + "wokwi-pi-pico": "rp2040:rp2040:rpipico", + "raspberry-pi-pico": "rp2040:rp2040:rpipico", + "wokwi-esp32-devkit-v1": "esp32:esp32:esp32", +} + +VELXIO_TO_WOKWI_BOARD: dict[str, str] = {v: k for k, v in WOKWI_TO_VELXIO_BOARD.items()} + +# Default board FQBN when none can be detected +DEFAULT_BOARD_FQBN = "arduino:avr:uno" + +# --------------------------------------------------------------------------- +# Color helpers +# --------------------------------------------------------------------------- + +COLOR_NAMES: dict[str, str] = { + "red": "#ff0000", + "green": "#00ff00", + "blue": "#0000ff", + "yellow": "#ffff00", + "orange": "#ffa500", + "purple": "#800080", + "white": "#ffffff", + "black": "#000000", + "cyan": "#00ffff", + "magenta": "#ff00ff", + "lime": "#00ff00", + "pink": "#ffc0cb", + "gray": "#808080", + "grey": "#808080", + "brown": "#a52a2a", + "gold": "#ffd700", + "silver": "#c0c0c0", +} + +HEX_TO_NAME: dict[str, str] = {v: k for k, v in COLOR_NAMES.items()} + + +def color_to_hex(color: str) -> str: + """Normalise a color string to lowercase hex (e.g. 'red' -> '#ff0000').""" + c = color.strip().lower() + if c in COLOR_NAMES: + return COLOR_NAMES[c] + if c.startswith("#"): + return c + return "#" + c + + +def hex_to_color_name(hex_color: str) -> str: + """Return a friendly color name if one exists, otherwise return the hex string.""" + normalized = hex_color.strip().lower() + return HEX_TO_NAME.get(normalized, hex_color) + + +# --------------------------------------------------------------------------- +# Wokwi → Velxio conversion +# --------------------------------------------------------------------------- + +BOARD_PART_TYPES = set(WOKWI_TO_VELXIO_BOARD.keys()) + + +def _detect_board_fqbn(parts: list[dict[str, Any]]) -> str: + """Return the board FQBN inferred from the parts list.""" + for part in parts: + part_type = part.get("type", "") + if part_type in WOKWI_TO_VELXIO_BOARD: + return WOKWI_TO_VELXIO_BOARD[part_type] + return DEFAULT_BOARD_FQBN + + +def parse_wokwi_diagram(diagram: dict[str, Any]) -> dict[str, Any]: + """ + Convert a Wokwi diagram.json payload into a Velxio circuit object. + + The returned object has the shape: + { + "board_fqbn": str, + "components": [ { "id", "type", "left", "top", "rotate", "attrs" } ], + "connections": [ { "from_part", "from_pin", "to_part", "to_pin", "color" } ], + "version": int, + } + """ + parts: list[dict[str, Any]] = diagram.get("parts", []) + raw_connections: list[Any] = diagram.get("connections", []) + + board_fqbn = _detect_board_fqbn(parts) + + # Build component list (skip board parts – they are implicit in board_fqbn) + components: list[dict[str, Any]] = [] + for part in parts: + components.append( + { + "id": part.get("id", ""), + "type": part.get("type", ""), + "left": float(part.get("left", 0)), + "top": float(part.get("top", 0)), + "rotate": int(part.get("rotate", 0)), + "attrs": dict(part.get("attrs", {})), + } + ) + + # Build connection list + connections: list[dict[str, Any]] = [] + for conn in raw_connections: + # Each connection is [from, to, color, segments] + if not isinstance(conn, (list, tuple)) or len(conn) < 2: + continue + from_endpoint: str = conn[0] + to_endpoint: str = conn[1] + color_raw: str = conn[2] if len(conn) > 2 else "green" + color = color_to_hex(color_raw) + + # "partId:pinName" → split at first ':' + from_parts = from_endpoint.split(":", 1) + to_parts = to_endpoint.split(":", 1) + connections.append( + { + "from_part": from_parts[0], + "from_pin": from_parts[1] if len(from_parts) > 1 else "", + "to_part": to_parts[0], + "to_pin": to_parts[1] if len(to_parts) > 1 else "", + "color": color, + } + ) + + return { + "board_fqbn": board_fqbn, + "components": components, + "connections": connections, + "version": int(diagram.get("version", 1)), + } + + +# --------------------------------------------------------------------------- +# Velxio → Wokwi conversion +# --------------------------------------------------------------------------- + + +def format_wokwi_diagram( + circuit: dict[str, Any], + author: str = "velxio", +) -> dict[str, Any]: + """ + Convert a Velxio circuit object back into a Wokwi diagram.json payload. + """ + board_fqbn: str = circuit.get("board_fqbn", DEFAULT_BOARD_FQBN) + wokwi_board_type: str = VELXIO_TO_WOKWI_BOARD.get(board_fqbn, "wokwi-arduino-uno") + + # Build parts list; inject board part first if not already present + raw_components: list[dict[str, Any]] = circuit.get("components", []) + + board_present = any(c.get("type") == wokwi_board_type for c in raw_components) + parts: list[dict[str, Any]] = [] + + if not board_present: + parts.append( + { + "type": wokwi_board_type, + "id": "uno", + "top": 0, + "left": 0, + "attrs": {}, + } + ) + + for comp in raw_components: + parts.append( + { + "type": comp.get("type", ""), + "id": comp.get("id", ""), + "top": comp.get("top", 0), + "left": comp.get("left", 0), + "rotate": comp.get("rotate", 0), + "attrs": comp.get("attrs", {}), + } + ) + + # Build connections list [ [from, to, color, []] ] + raw_connections: list[dict[str, Any]] = circuit.get("connections", []) + connections: list[list[Any]] = [] + for conn in raw_connections: + from_endpoint = f"{conn.get('from_part', '')}:{conn.get('from_pin', '')}" + to_endpoint = f"{conn.get('to_part', '')}:{conn.get('to_pin', '')}" + color = hex_to_color_name(conn.get("color", "green")) + connections.append([from_endpoint, to_endpoint, color, []]) + + return { + "version": int(circuit.get("version", 1)), + "author": author, + "editor": "velxio", + "parts": parts, + "connections": connections, + } + + +# --------------------------------------------------------------------------- +# Code template generation +# --------------------------------------------------------------------------- + +COMPONENT_PIN_TEMPLATES: dict[str, str] = { + "wokwi-led": "pinMode({pin}, OUTPUT); // LED", + "wokwi-pushbutton": "pinMode({pin}, INPUT_PULLUP); // Button", + "wokwi-buzzer": "pinMode({pin}, OUTPUT); // Buzzer", + "wokwi-servo": "// Servo on pin {pin}: use Servo library", +} + + +def generate_arduino_sketch(circuit: dict[str, Any], sketch_name: str = "sketch") -> str: + """ + Generate a minimal Arduino sketch (.ino) from a Velxio circuit definition. + + Returns the .ino file content as a string. + """ + components = circuit.get("components", []) + connections = circuit.get("connections", []) + + # Determine unique pins referenced by connections that involve the board + board_pin_map: dict[str, list[str]] = {} # component_id -> [pin_names] + board_part_ids = { + c["id"] for c in components if c.get("type", "") in BOARD_PART_TYPES + } + + for conn in connections: + for side in ("from", "to"): + part = conn.get(f"{side}_part", "") + pin = conn.get(f"{side}_pin", "") + if part in board_part_ids and pin: + other_side = "to" if side == "from" else "from" + other_part = conn.get(f"{other_side}_part", "") + board_pin_map.setdefault(other_part, []).append(pin) + + # Build setup/loop + includes: list[str] = [] + setup_lines: list[str] = ["void setup() {", " Serial.begin(9600);"] + loop_lines: list[str] = ["void loop() {", " // TODO: add your logic here"] + + for comp in components: + comp_id = comp.get("id", "") + comp_type = comp.get("type", "") + if comp_id in board_pin_map: + for pin in board_pin_map[comp_id]: + template = COMPONENT_PIN_TEMPLATES.get(comp_type) + if template: + setup_lines.append(f" {template.format(pin=pin)}") + + setup_lines.append("}") + loop_lines.append("}") + + lines = includes + [""] + setup_lines + [""] + loop_lines + [""] + return "\n".join(lines) diff --git a/backend/mcp_server.py b/backend/mcp_server.py new file mode 100644 index 0000000..2a24d9a --- /dev/null +++ b/backend/mcp_server.py @@ -0,0 +1,31 @@ +""" +Velxio MCP Server — stdio entry point + +Run this script to start the MCP server in stdio mode, which is compatible +with Claude Desktop and other stdio-based MCP clients. + +Usage: + python mcp_server.py + +Claude Desktop configuration (~/.claude/claude_desktop_config.json): + { + "mcpServers": { + "velxio": { + "command": "python", + "args": ["/path/to/velxio/backend/mcp_server.py"], + "env": {} + } + } + } +""" + +import sys +from pathlib import Path + +# Ensure the backend package is importable when run as a script +sys.path.insert(0, str(Path(__file__).parent)) + +from app.mcp.server import mcp # noqa: E402 + +if __name__ == "__main__": + mcp.run(transport="stdio") diff --git a/backend/mcp_sse_server.py b/backend/mcp_sse_server.py new file mode 100644 index 0000000..138a129 --- /dev/null +++ b/backend/mcp_sse_server.py @@ -0,0 +1,46 @@ +""" +Velxio MCP Server — HTTP/SSE entry point + +Run this script to start the MCP server using the SSE transport, which is +compatible with HTTP-based MCP clients (e.g. web applications, Cursor IDE). + +The SSE server runs on port 8002 by default (separate from the main FastAPI +backend on port 8001) to avoid Starlette version conflicts. + +Usage: + python mcp_sse_server.py [--port 8002] [--host 0.0.0.0] + +MCP client configuration (SSE transport): + { + "mcpServers": { + "velxio": { + "url": "http://localhost:8002/sse" + } + } + } +""" + +import sys +import argparse +from pathlib import Path + +import uvicorn + +# Ensure the backend package is importable when run as a script +sys.path.insert(0, str(Path(__file__).parent)) + +from app.mcp.server import mcp # noqa: E402 + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Velxio MCP SSE Server") + parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") + parser.add_argument("--port", type=int, default=8002, help="Port to listen on") + args = parser.parse_args() + + print(f"Starting Velxio MCP SSE server on {args.host}:{args.port}") + print(f"SSE endpoint: http://{args.host}:{args.port}/sse") + print(f"Tools: compile_project, run_project, import_wokwi_json, export_wokwi_json,") + print(f" create_circuit, update_circuit, generate_code_files") + + sse_app = mcp.sse_app() + uvicorn.run(sse_app, host=args.host, port=args.port) diff --git a/backend/requirements.txt b/backend/requirements.txt index 48e00c9..cc9c282 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -11,3 +11,4 @@ bcrypt==4.0.1 httpx==0.27.0 authlib==1.3.1 email-validator==2.2.0 +mcp>=1.0.0 diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/backend/tests/test_mcp_tools.py b/backend/tests/test_mcp_tools.py new file mode 100644 index 0000000..7c38d6c --- /dev/null +++ b/backend/tests/test_mcp_tools.py @@ -0,0 +1,416 @@ +""" +Tests for the Velxio MCP server tools. + +These tests exercise the tool functions directly (without a running MCP transport) +and mock the ArduinoCLIService to avoid requiring arduino-cli to be installed. +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +# Ensure the backend package is importable when running from the repo root +sys.path.insert(0, str(Path(__file__).parent.parent)) + + +# --------------------------------------------------------------------------- +# Wokwi utility tests +# --------------------------------------------------------------------------- + +from app.mcp.wokwi import ( + color_to_hex, + format_wokwi_diagram, + generate_arduino_sketch, + hex_to_color_name, + parse_wokwi_diagram, +) + + +class TestColorHelpers: + def test_named_color_to_hex(self): + assert color_to_hex("red") == "#ff0000" + assert color_to_hex("GREEN") == "#00ff00" + + def test_hex_passthrough(self): + assert color_to_hex("#1a2b3c") == "#1a2b3c" + + def test_hex_to_name(self): + assert hex_to_color_name("#ff0000") == "red" + assert hex_to_color_name("#unknown") == "#unknown" + + +SAMPLE_DIAGRAM = { + "version": 1, + "author": "test", + "editor": "wokwi", + "parts": [ + {"type": "wokwi-arduino-uno", "id": "uno", "top": 0, "left": 0, "attrs": {}}, + { + "type": "wokwi-led", + "id": "led1", + "top": 100, + "left": 200, + "rotate": 0, + "attrs": {"color": "red"}, + }, + { + "type": "wokwi-resistor", + "id": "r1", + "top": 150, + "left": 200, + "attrs": {"value": "220"}, + }, + ], + "connections": [ + ["uno:13", "led1:A", "green", []], + ["led1:C", "r1:1", "black", []], + ["r1:2", "uno:GND.1", "black", []], + ], +} + + +class TestParseWokwiDiagram: + def test_detects_board_fqbn(self): + result = parse_wokwi_diagram(SAMPLE_DIAGRAM) + assert result["board_fqbn"] == "arduino:avr:uno" + + def test_components_parsed(self): + result = parse_wokwi_diagram(SAMPLE_DIAGRAM) + ids = [c["id"] for c in result["components"]] + assert "uno" in ids + assert "led1" in ids + assert "r1" in ids + + def test_connections_parsed(self): + result = parse_wokwi_diagram(SAMPLE_DIAGRAM) + conns = result["connections"] + assert len(conns) == 3 + first = conns[0] + assert first["from_part"] == "uno" + assert first["from_pin"] == "13" + assert first["to_part"] == "led1" + assert first["to_pin"] == "A" + assert first["color"] == "#00ff00" # 'green' -> hex + + def test_version(self): + result = parse_wokwi_diagram(SAMPLE_DIAGRAM) + assert result["version"] == 1 + + def test_empty_diagram(self): + result = parse_wokwi_diagram({}) + assert result["board_fqbn"] == "arduino:avr:uno" + assert result["components"] == [] + assert result["connections"] == [] + + def test_unknown_board_defaults(self): + diagram = {"parts": [{"type": "unknown-board", "id": "b1", "attrs": {}}]} + result = parse_wokwi_diagram(diagram) + assert result["board_fqbn"] == "arduino:avr:uno" + + +class TestFormatWokwiDiagram: + def test_round_trip(self): + """parse → format should preserve the essential structure.""" + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + wokwi = format_wokwi_diagram(circuit) + + part_ids = [p["id"] for p in wokwi["parts"]] + assert "led1" in part_ids + assert "r1" in part_ids + + def test_board_injected(self): + circuit = { + "board_fqbn": "arduino:avr:uno", + "components": [], + "connections": [], + "version": 1, + } + wokwi = format_wokwi_diagram(circuit) + types = [p["type"] for p in wokwi["parts"]] + assert "wokwi-arduino-uno" in types + + def test_author_embedded(self): + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + wokwi = format_wokwi_diagram(circuit, author="alice") + assert wokwi["author"] == "alice" + + def test_connections_formatted(self): + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + wokwi = format_wokwi_diagram(circuit) + assert len(wokwi["connections"]) == 3 + # Each connection should be [from, to, color, []] + for conn in wokwi["connections"]: + assert isinstance(conn, list) + assert len(conn) == 4 + + +class TestGenerateArduinoSketch: + def test_generates_setup_and_loop(self): + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + sketch = generate_arduino_sketch(circuit) + assert "void setup()" in sketch + assert "void loop()" in sketch + + def test_contains_serial_begin(self): + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + sketch = generate_arduino_sketch(circuit) + assert "Serial.begin" in sketch + + +# --------------------------------------------------------------------------- +# MCP tool function tests +# --------------------------------------------------------------------------- + +from app.mcp.server import ( + compile_project, + create_circuit, + export_wokwi_json, + generate_code_files, + import_wokwi_json, + run_project, + update_circuit, +) + + +class TestImportWokwiJson: + @pytest.mark.asyncio + async def test_valid_diagram(self): + result = await import_wokwi_json(json.dumps(SAMPLE_DIAGRAM)) + assert "board_fqbn" in result + assert "components" in result + assert "connections" in result + + @pytest.mark.asyncio + async def test_invalid_json(self): + result = await import_wokwi_json("not-json{") + assert "error" in result + + @pytest.mark.asyncio + async def test_not_object(self): + result = await import_wokwi_json("[1, 2, 3]") + assert "error" in result + + +class TestExportWokwiJson: + @pytest.mark.asyncio + async def test_valid_circuit(self): + circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM) + result = await export_wokwi_json(circuit) + assert "parts" in result + assert "connections" in result + assert result["editor"] == "velxio" + + @pytest.mark.asyncio + async def test_invalid_input(self): + result = await export_wokwi_json("not a dict") # type: ignore[arg-type] + assert "error" in result + + +class TestCreateCircuit: + @pytest.mark.asyncio + async def test_defaults(self): + result = await create_circuit() + assert result["board_fqbn"] == "arduino:avr:uno" + assert result["components"] == [] + assert result["connections"] == [] + + @pytest.mark.asyncio + async def test_with_components(self): + result = await create_circuit( + board_fqbn="arduino:avr:uno", + components=[{"id": "led1", "type": "wokwi-led", "left": 100, "top": 50}], + connections=[ + { + "from_part": "uno", + "from_pin": "13", + "to_part": "led1", + "to_pin": "A", + "color": "green", + } + ], + ) + assert len(result["components"]) == 1 + assert result["components"][0]["id"] == "led1" + assert len(result["connections"]) == 1 + + +class TestUpdateCircuit: + @pytest.mark.asyncio + async def test_add_component(self): + circuit = await create_circuit( + components=[{"id": "led1", "type": "wokwi-led"}] + ) + updated = await update_circuit( + circuit=circuit, + add_components=[{"id": "btn1", "type": "wokwi-pushbutton"}], + ) + ids = [c["id"] for c in updated["components"]] + assert "led1" in ids + assert "btn1" in ids + + @pytest.mark.asyncio + async def test_remove_component(self): + circuit = await create_circuit( + components=[ + {"id": "led1", "type": "wokwi-led"}, + {"id": "r1", "type": "wokwi-resistor"}, + ] + ) + updated = await update_circuit( + circuit=circuit, + remove_component_ids=["r1"], + ) + ids = [c["id"] for c in updated["components"]] + assert "led1" in ids + assert "r1" not in ids + + @pytest.mark.asyncio + async def test_change_board(self): + circuit = await create_circuit() + updated = await update_circuit( + circuit=circuit, + board_fqbn="rp2040:rp2040:rpipico", + ) + assert updated["board_fqbn"] == "rp2040:rp2040:rpipico" + + @pytest.mark.asyncio + async def test_add_and_remove_connections(self): + circuit = await create_circuit( + connections=[ + { + "from_part": "uno", + "from_pin": "13", + "to_part": "led1", + "to_pin": "A", + } + ] + ) + updated = await update_circuit( + circuit=circuit, + add_connections=[ + { + "from_part": "uno", + "from_pin": "12", + "to_part": "led2", + "to_pin": "A", + } + ], + remove_connections=[ + { + "from_part": "uno", + "from_pin": "13", + "to_part": "led1", + "to_pin": "A", + } + ], + ) + assert len(updated["connections"]) == 1 + assert updated["connections"][0]["from_pin"] == "12" + + @pytest.mark.asyncio + async def test_invalid_circuit(self): + result = await update_circuit(circuit="bad") # type: ignore[arg-type] + assert "error" in result + + +class TestGenerateCodeFiles: + @pytest.mark.asyncio + async def test_generates_ino_file(self): + circuit = await create_circuit() + result = await generate_code_files(circuit=circuit) + assert len(result["files"]) == 1 + assert result["files"][0]["name"] == "sketch.ino" + + @pytest.mark.asyncio + async def test_custom_sketch_name(self): + circuit = await create_circuit() + result = await generate_code_files(circuit=circuit, sketch_name="blink") + assert result["files"][0]["name"] == "blink.ino" + + @pytest.mark.asyncio + async def test_extra_instructions(self): + circuit = await create_circuit() + result = await generate_code_files( + circuit=circuit, + extra_instructions="Generated by agent", + ) + assert "Generated by agent" in result["files"][0]["content"] + + @pytest.mark.asyncio + async def test_invalid_circuit(self): + result = await generate_code_files(circuit="bad") # type: ignore[arg-type] + assert "error" in result + + +class TestCompileProject: + @pytest.mark.asyncio + async def test_missing_keys(self): + result = await compile_project(files=[{"name": "sketch.ino"}]) + assert result["success"] is False + assert "content" in result["error"] + + @pytest.mark.asyncio + async def test_calls_arduino_cli(self): + mock_result = { + "success": True, + "hex_content": ":00000001FF", + "binary_content": None, + "binary_type": None, + "stdout": "", + "stderr": "", + } + with patch( + "app.mcp.server._arduino.compile", + new=AsyncMock(return_value=mock_result), + ): + result = await compile_project( + files=[{"name": "sketch.ino", "content": "void setup(){} void loop(){}"}] + ) + assert result["success"] is True + assert result["hex_content"] is not None + + +class TestRunProject: + @pytest.mark.asyncio + async def test_simulation_ready_flag_on_success(self): + mock_result = { + "success": True, + "hex_content": ":00000001FF", + "binary_content": None, + "binary_type": None, + "stdout": "", + "stderr": "", + } + with patch( + "app.mcp.server._arduino.compile", + new=AsyncMock(return_value=mock_result), + ): + result = await run_project( + files=[{"name": "sketch.ino", "content": "void setup(){} void loop(){}"}] + ) + assert result["simulation_ready"] is True + + @pytest.mark.asyncio + async def test_simulation_ready_flag_on_failure(self): + mock_result = { + "success": False, + "hex_content": None, + "binary_content": None, + "binary_type": None, + "stdout": "", + "stderr": "error", + "error": "Compilation failed", + } + with patch( + "app.mcp.server._arduino.compile", + new=AsyncMock(return_value=mock_result), + ): + result = await run_project( + files=[{"name": "sketch.ino", "content": "bad code{{{"}] + ) + assert result["simulation_ready"] is False diff --git a/doc/MCP.md b/doc/MCP.md new file mode 100644 index 0000000..482a543 --- /dev/null +++ b/doc/MCP.md @@ -0,0 +1,300 @@ +# Velxio MCP Server + +Velxio exposes a [Model Context Protocol](https://modelcontextprotocol.io/) (MCP) server that allows AI agents (e.g. Claude, Cursor) to: + +- **Create and update circuits** using a structured JSON format +- **Import and export** circuits in the Wokwi `diagram.json` format +- **Generate Arduino code** from circuit definitions +- **Compile projects** and receive structured results (hex/binary, logs) + +--- + +## Tools + +| Tool | Description | +|------|-------------| +| `compile_project` | Compile Arduino sketch files → Intel HEX / binary | +| `run_project` | Compile and mark artifact as simulation-ready | +| `import_wokwi_json` | Parse a Wokwi `diagram.json` → Velxio circuit | +| `export_wokwi_json` | Serialise a Velxio circuit → Wokwi `diagram.json` | +| `create_circuit` | Create a new circuit definition | +| `update_circuit` | Merge changes into an existing circuit | +| `generate_code_files` | Generate starter `.ino` code from a circuit | + +--- + +## Transport Options + +### 1. stdio (Claude Desktop / CLI agents) + +Run the MCP server as a child process: + +```bash +cd backend +python mcp_server.py +``` + +**Claude Desktop config** (`~/.claude/claude_desktop_config.json`): + +```json +{ + "mcpServers": { + "velxio": { + "command": "python", + "args": ["/absolute/path/to/velxio/backend/mcp_server.py"] + } + } +} +``` + +### 2. SSE / HTTP (web agents, Cursor IDE) + +Run the MCP SSE server on a separate port (default: **8002**): + +```bash +cd backend +python mcp_sse_server.py --port 8002 +``` + +**MCP client config (SSE transport)**: + +```json +{ + "mcpServers": { + "velxio": { + "url": "http://localhost:8002/sse" + } + } +} +``` + +> **Note**: The SSE server runs separately from the main FastAPI backend (port 8001) to avoid Starlette version conflicts. Both can run simultaneously. + +--- + +## Setup + +1. **Install dependencies**: + + ```bash + cd backend + pip install -r requirements.txt + ``` + +2. **Ensure arduino-cli is installed** (required for `compile_project` / `run_project`): + + ```bash + arduino-cli version + arduino-cli core update-index + arduino-cli core install arduino:avr + ``` + +--- + +## Example Walkthroughs + +### Example 1 — Blink LED (from scratch) + +**Step 1** — Create a circuit: + +```json +{ + "tool": "create_circuit", + "arguments": { + "board_fqbn": "arduino:avr:uno", + "components": [ + { "id": "led1", "type": "wokwi-led", "left": 150, "top": 100, "attrs": { "color": "red" } }, + { "id": "r1", "type": "wokwi-resistor", "left": 150, "top": 180, "attrs": { "value": "220" } } + ], + "connections": [ + { "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "green" }, + { "from_part": "led1", "from_pin": "C", "to_part": "r1", "to_pin": "1", "color": "black" }, + { "from_part": "r1", "from_pin": "2", "to_part": "uno", "to_pin": "GND.1", "color": "black" } + ] + } +} +``` + +**Step 2** — Generate code: + +```json +{ + "tool": "generate_code_files", + "arguments": { + "circuit": "", + "sketch_name": "blink", + "extra_instructions": "Blink the red LED every 500ms" + } +} +``` + +**Step 3** — Compile the generated code (edit the sketch content as needed): + +```json +{ + "tool": "compile_project", + "arguments": { + "files": [ + { + "name": "blink.ino", + "content": "void setup() { pinMode(13, OUTPUT); }\nvoid loop() { digitalWrite(13, HIGH); delay(500); digitalWrite(13, LOW); delay(500); }" + } + ], + "board": "arduino:avr:uno" + } +} +``` + +**Response**: + +```json +{ + "success": true, + "hex_content": ":100000000C9434000C943E000C943E000C943E...", + "binary_content": null, + "binary_type": null, + "stdout": "", + "stderr": "" +} +``` + +--- + +### Example 2 — Import a Wokwi Circuit + +**Import**: + +```json +{ + "tool": "import_wokwi_json", + "arguments": { + "diagram_json": "{\"version\":1,\"author\":\"example\",\"editor\":\"wokwi\",\"parts\":[{\"type\":\"wokwi-arduino-uno\",\"id\":\"uno\",\"top\":0,\"left\":0,\"attrs\":{}},{\"type\":\"wokwi-led\",\"id\":\"led1\",\"top\":100,\"left\":200,\"attrs\":{\"color\":\"green\"}}],\"connections\":[[\"uno:13\",\"led1:A\",\"green\",[]]]}" + } +} +``` + +**Response**: + +```json +{ + "board_fqbn": "arduino:avr:uno", + "components": [ + { "id": "uno", "type": "wokwi-arduino-uno", "left": 0, "top": 0, "rotate": 0, "attrs": {} }, + { "id": "led1", "type": "wokwi-led", "left": 200, "top": 100, "rotate": 0, "attrs": { "color": "green" } } + ], + "connections": [ + { "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "#00ff00" } + ], + "version": 1 +} +``` + +--- + +### Example 3 — Export to Wokwi Format + +```json +{ + "tool": "export_wokwi_json", + "arguments": { + "circuit": { + "board_fqbn": "arduino:avr:uno", + "components": [ + { "id": "led1", "type": "wokwi-led", "left": 200, "top": 100, "rotate": 0, "attrs": {} } + ], + "connections": [ + { "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "green" } + ], + "version": 1 + }, + "author": "my-agent" + } +} +``` + +**Response** (Wokwi diagram.json format): + +```json +{ + "version": 1, + "author": "my-agent", + "editor": "velxio", + "parts": [ + { "type": "wokwi-arduino-uno", "id": "uno", "top": 0, "left": 0, "attrs": {} }, + { "type": "wokwi-led", "id": "led1", "top": 100, "left": 200, "rotate": 0, "attrs": {} } + ], + "connections": [ + ["uno:13", "led1:A", "green", []] + ] +} +``` + +--- + +## Circuit Data Format + +Velxio circuits are plain JSON objects: + +```json +{ + "board_fqbn": "arduino:avr:uno", + "version": 1, + "components": [ + { + "id": "led1", + "type": "wokwi-led", + "left": 200, + "top": 100, + "rotate": 0, + "attrs": { "color": "red" } + } + ], + "connections": [ + { + "from_part": "uno", + "from_pin": "13", + "to_part": "led1", + "to_pin": "A", + "color": "green" + } + ] +} +``` + +### Supported Board FQBNs + +| Board | FQBN | +|-------|------| +| Arduino Uno | `arduino:avr:uno` | +| Arduino Mega | `arduino:avr:mega` | +| Arduino Nano | `arduino:avr:nano` | +| Raspberry Pi Pico | `rp2040:rp2040:rpipico` | +| ESP32 DevKit | `esp32:esp32:esp32` | + +### Common Component Types (Wokwi element names) + +- `wokwi-led` — LED (attrs: `color`) +- `wokwi-resistor` — Resistor (attrs: `value` in Ω) +- `wokwi-pushbutton` — Push button +- `wokwi-buzzer` — Passive buzzer +- `wokwi-servo` — Servo motor +- `wokwi-lcd1602` — 16×2 LCD display +- `wokwi-neopixel` — NeoPixel RGB LED + +--- + +## Sandboxing & Limits + +- Compilation runs in a **temporary directory** cleaned up after each call. +- arduino-cli is invoked as a **subprocess** with no elevated privileges. +- There is no explicit CPU/memory timeout in the default configuration. For production deployments, set `COMPILATION_TIMEOUT_SECONDS` in the environment and enforce it at the process level. + +--- + +## Running Tests + +```bash +cd backend +pip install pytest pytest-asyncio +python -m pytest tests/test_mcp_tools.py -v +```