feat: integrate MCP (Model Context Protocol) for agent compatibility

Co-authored-by: davidmonterocrespo24 <47928504+davidmonterocrespo24@users.noreply.github.com>
pull/18/head
copilot-swe-agent[bot] 2026-03-11 15:29:55 +00:00
parent bcdb2ce915
commit e7b4854f63
9 changed files with 1482 additions and 0 deletions

View File

@ -0,0 +1,3 @@
# MCP (Model Context Protocol) server for Velxio
# Exposes tools for circuit authoring, Wokwi JSON import/export,
# code generation, and compilation.

406
backend/app/mcp/server.py Normal file
View File

@ -0,0 +1,406 @@
"""
Velxio MCP Server
Exposes the following tools to MCP-compatible agents (e.g. Claude):
- compile_project Compile Arduino sketch files hex / binary
- run_project Compile and return simulation-ready artifacts
- import_wokwi_json Parse a Wokwi diagram.json Velxio circuit
- export_wokwi_json Serialise a Velxio circuit Wokwi diagram.json
- create_circuit Create a new circuit definition
- update_circuit Merge changes into an existing circuit definition
- generate_code_files Generate starter Arduino code from a circuit
Transport:
- stdio run `python mcp_server.py` for Claude Desktop / CLI agents
- SSE mounted at /mcp in the FastAPI app for HTTP-based agents
"""
from __future__ import annotations
import json
import sys
from typing import Annotated, Any
from mcp.server.fastmcp import FastMCP
from app.mcp.wokwi import (
format_wokwi_diagram,
generate_arduino_sketch,
parse_wokwi_diagram,
)
from app.services.arduino_cli import ArduinoCLIService
# ---------------------------------------------------------------------------
# Server setup
# ---------------------------------------------------------------------------
mcp = FastMCP(
name="velxio",
instructions=(
"Velxio MCP server — create circuits, import/export Wokwi JSON, "
"generate Arduino code, and compile projects."
),
)
_arduino = ArduinoCLIService()
# ---------------------------------------------------------------------------
# compile_project
# ---------------------------------------------------------------------------
@mcp.tool()
async def compile_project(
files: Annotated[
list[dict[str, str]],
"List of source files. Each item must have 'name' (filename, e.g. 'sketch.ino') "
"and 'content' (file text).",
],
board: Annotated[
str,
"Arduino board FQBN, e.g. 'arduino:avr:uno' or 'rp2040:rp2040:rpipico'. "
"Defaults to 'arduino:avr:uno'.",
] = "arduino:avr:uno",
) -> dict[str, Any]:
"""
Compile one or more Arduino sketch files and return the compiled artifact.
Returns a dict with:
- success (bool)
- hex_content (str | null) Intel HEX for AVR boards
- binary_content (str | null) Base-64 .bin/.uf2 for RP2040
- binary_type (str | null) 'bin' or 'uf2'
- stdout (str)
- stderr (str)
- error (str | null)
"""
for f in files:
if "name" not in f or "content" not in f:
return {
"success": False,
"error": "Each file entry must have 'name' and 'content' keys.",
"stdout": "",
"stderr": "",
}
try:
result = await _arduino.compile(files, board)
return result
except Exception as exc: # pragma: no cover
return {
"success": False,
"error": str(exc),
"stdout": "",
"stderr": "",
}
# ---------------------------------------------------------------------------
# run_project
# ---------------------------------------------------------------------------
@mcp.tool()
async def run_project(
files: Annotated[
list[dict[str, str]],
"List of source files (same format as compile_project).",
],
board: Annotated[str, "Board FQBN (default: 'arduino:avr:uno')."] = "arduino:avr:uno",
) -> dict[str, Any]:
"""
Compile the project and return simulation-ready artifacts.
The Velxio frontend can load the returned hex_content / binary_content
directly into its AVR / RP2040 emulator. Actual execution happens
client-side in the browser.
Returns the same payload as compile_project plus a 'simulation_ready' flag.
"""
result = await compile_project(files=files, board=board)
result["simulation_ready"] = result.get("success", False)
return result
# ---------------------------------------------------------------------------
# import_wokwi_json
# ---------------------------------------------------------------------------
@mcp.tool()
async def import_wokwi_json(
diagram_json: Annotated[
str,
"Wokwi diagram.json content as a JSON string. "
"Must contain at minimum a 'parts' array.",
],
) -> dict[str, Any]:
"""
Parse a Wokwi diagram.json payload and return a Velxio circuit object.
The returned circuit can be passed directly to export_wokwi_json,
generate_code_files, compile_project, or saved as a Velxio project.
Returns:
- board_fqbn (str) Detected Arduino board FQBN
- components (list) List of component objects
- connections (list) List of connection objects
- version (int)
"""
try:
diagram = json.loads(diagram_json)
except json.JSONDecodeError as exc:
return {"error": f"Invalid JSON: {exc}"}
if not isinstance(diagram, dict):
return {"error": "diagram_json must be a JSON object."}
return parse_wokwi_diagram(diagram)
# ---------------------------------------------------------------------------
# export_wokwi_json
# ---------------------------------------------------------------------------
@mcp.tool()
async def export_wokwi_json(
circuit: Annotated[
dict[str, Any],
"Velxio circuit object with 'components', 'connections', and 'board_fqbn'.",
],
author: Annotated[str, "Author name to embed in the diagram (default: 'velxio')."] = "velxio",
) -> dict[str, Any]:
"""
Convert a Velxio circuit object into a Wokwi diagram.json payload.
The returned payload is compatible with the Wokwi simulator and can be
imported using the Wokwi zip import feature in Velxio.
Returns the Wokwi diagram dict (version, author, editor, parts, connections).
"""
if not isinstance(circuit, dict):
return {"error": "circuit must be a JSON object."}
return format_wokwi_diagram(circuit, author=author)
# ---------------------------------------------------------------------------
# create_circuit
# ---------------------------------------------------------------------------
@mcp.tool()
async def create_circuit(
board_fqbn: Annotated[
str,
"Arduino board FQBN. e.g. 'arduino:avr:uno', 'rp2040:rp2040:rpipico'.",
] = "arduino:avr:uno",
components: Annotated[
list[dict[str, Any]] | None,
"List of component objects. Each item may have: "
"id (str), type (str, Wokwi element type), left (number), top (number), "
"rotate (number), attrs (object).",
] = None,
connections: Annotated[
list[dict[str, Any]] | None,
"List of connection objects. Each item may have: "
"from_part (str), from_pin (str), to_part (str), to_pin (str), color (str).",
] = None,
) -> dict[str, Any]:
"""
Create a new Velxio circuit definition.
Example component types: wokwi-led, wokwi-pushbutton, wokwi-resistor,
wokwi-buzzer, wokwi-servo, wokwi-lcd1602.
Example connection:
{ "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A",
"color": "green" }
Returns the new circuit object (board_fqbn, components, connections, version).
"""
components_list = components if components is not None else []
connections_list = connections if connections is not None else []
# Normalise components
normalised_components: list[dict[str, Any]] = []
for i, comp in enumerate(components_list):
normalised_components.append(
{
"id": comp.get("id", f"comp{i}"),
"type": comp.get("type", ""),
"left": float(comp.get("left", 0)),
"top": float(comp.get("top", 0)),
"rotate": int(comp.get("rotate", 0)),
"attrs": dict(comp.get("attrs", {})),
}
)
# Normalise connections
normalised_connections: list[dict[str, Any]] = []
for conn in connections_list:
normalised_connections.append(
{
"from_part": conn.get("from_part", ""),
"from_pin": conn.get("from_pin", ""),
"to_part": conn.get("to_part", ""),
"to_pin": conn.get("to_pin", ""),
"color": conn.get("color", "green"),
}
)
return {
"board_fqbn": board_fqbn,
"components": normalised_components,
"connections": normalised_connections,
"version": 1,
}
# ---------------------------------------------------------------------------
# update_circuit
# ---------------------------------------------------------------------------
@mcp.tool()
async def update_circuit(
circuit: Annotated[
dict[str, Any],
"Existing Velxio circuit object to update.",
],
add_components: Annotated[
list[dict[str, Any]] | None,
"Components to add. Merged after existing components.",
] = None,
remove_component_ids: Annotated[
list[str] | None,
"IDs of components to remove.",
] = None,
add_connections: Annotated[
list[dict[str, Any]] | None,
"Connections to add.",
] = None,
remove_connections: Annotated[
list[dict[str, Any]] | None,
"Connections to remove (matched by from_part+from_pin+to_part+to_pin).",
] = None,
board_fqbn: Annotated[
str | None,
"If provided, replaces the circuit board_fqbn.",
] = None,
) -> dict[str, Any]:
"""
Merge changes into an existing Velxio circuit definition.
Supports adding/removing components and connections, and changing the board.
Returns the updated circuit object.
"""
if not isinstance(circuit, dict):
return {"error": "circuit must be a JSON object."}
import copy
updated = copy.deepcopy(circuit)
if board_fqbn is not None:
updated["board_fqbn"] = board_fqbn
# Remove components
if remove_component_ids:
remove_set = set(remove_component_ids)
updated["components"] = [
c for c in updated.get("components", []) if c.get("id") not in remove_set
]
# Add components
existing_ids = {c.get("id") for c in updated.get("components", [])}
for i, comp in enumerate(add_components or []):
comp_id = comp.get("id", f"comp_new_{i}")
if comp_id in existing_ids:
comp_id = f"{comp_id}_new"
updated.setdefault("components", []).append(
{
"id": comp_id,
"type": comp.get("type", ""),
"left": float(comp.get("left", 0)),
"top": float(comp.get("top", 0)),
"rotate": int(comp.get("rotate", 0)),
"attrs": dict(comp.get("attrs", {})),
}
)
# Remove connections (exact match)
if remove_connections:
def _conn_key(c: dict[str, Any]) -> tuple[str, str, str, str]:
return (
c.get("from_part", ""),
c.get("from_pin", ""),
c.get("to_part", ""),
c.get("to_pin", ""),
)
remove_keys = {_conn_key(c) for c in remove_connections}
updated["connections"] = [
c for c in updated.get("connections", []) if _conn_key(c) not in remove_keys
]
# Add connections
for conn in (add_connections or []):
updated.setdefault("connections", []).append(
{
"from_part": conn.get("from_part", ""),
"from_pin": conn.get("from_pin", ""),
"to_part": conn.get("to_part", ""),
"to_pin": conn.get("to_pin", ""),
"color": conn.get("color", "green"),
}
)
return updated
# ---------------------------------------------------------------------------
# generate_code_files
# ---------------------------------------------------------------------------
@mcp.tool()
async def generate_code_files(
circuit: Annotated[
dict[str, Any],
"Velxio circuit object (from create_circuit or import_wokwi_json).",
],
sketch_name: Annotated[
str,
"Base name for the generated sketch file (without extension).",
] = "sketch",
extra_instructions: Annotated[
str,
"Optional extra instructions or comments to embed in the sketch.",
] = "",
) -> dict[str, Any]:
"""
Generate starter Arduino code files for the given circuit.
Returns:
- files: list of { "name": str, "content": str } ready for compile_project
- board_fqbn: str detected board FQBN
"""
if not isinstance(circuit, dict):
return {"error": "circuit must be a JSON object."}
sketch_content = generate_arduino_sketch(circuit, sketch_name=sketch_name)
if extra_instructions:
header = f"// {extra_instructions}\n"
sketch_content = header + sketch_content
board_fqbn: str = circuit.get("board_fqbn", "arduino:avr:uno")
return {
"files": [{"name": f"{sketch_name}.ino", "content": sketch_content}],
"board_fqbn": board_fqbn,
}

279
backend/app/mcp/wokwi.py Normal file
View File

@ -0,0 +1,279 @@
"""
Wokwi diagram.json parse/format utilities (Python port of frontend/src/utils/wokwiZip.ts).
Handles conversion between Wokwi diagram format and the Velxio internal circuit format.
"""
from __future__ import annotations
from typing import Any
# ---------------------------------------------------------------------------
# Type aliases / data models (plain dicts for simplicity)
# ---------------------------------------------------------------------------
# Wokwi diagram.json structure:
# {
# "version": 1,
# "author": "...",
# "editor": "wokwi",
# "parts": [ { "type": "...", "id": "...", "top": 0, "left": 0, "attrs": {} } ],
# "connections": [ ["partId:pinName", "partId:pinName", "color", []] ]
# }
# ---------------------------------------------------------------------------
# Board type mappings
# ---------------------------------------------------------------------------
WOKWI_TO_VELXIO_BOARD: dict[str, str] = {
"wokwi-arduino-uno": "arduino:avr:uno",
"wokwi-arduino-mega": "arduino:avr:mega",
"wokwi-arduino-nano": "arduino:avr:nano",
"wokwi-pi-pico": "rp2040:rp2040:rpipico",
"raspberry-pi-pico": "rp2040:rp2040:rpipico",
"wokwi-esp32-devkit-v1": "esp32:esp32:esp32",
}
VELXIO_TO_WOKWI_BOARD: dict[str, str] = {v: k for k, v in WOKWI_TO_VELXIO_BOARD.items()}
# Default board FQBN when none can be detected
DEFAULT_BOARD_FQBN = "arduino:avr:uno"
# ---------------------------------------------------------------------------
# Color helpers
# ---------------------------------------------------------------------------
COLOR_NAMES: dict[str, str] = {
"red": "#ff0000",
"green": "#00ff00",
"blue": "#0000ff",
"yellow": "#ffff00",
"orange": "#ffa500",
"purple": "#800080",
"white": "#ffffff",
"black": "#000000",
"cyan": "#00ffff",
"magenta": "#ff00ff",
"lime": "#00ff00",
"pink": "#ffc0cb",
"gray": "#808080",
"grey": "#808080",
"brown": "#a52a2a",
"gold": "#ffd700",
"silver": "#c0c0c0",
}
HEX_TO_NAME: dict[str, str] = {v: k for k, v in COLOR_NAMES.items()}
def color_to_hex(color: str) -> str:
"""Normalise a color string to lowercase hex (e.g. 'red' -> '#ff0000')."""
c = color.strip().lower()
if c in COLOR_NAMES:
return COLOR_NAMES[c]
if c.startswith("#"):
return c
return "#" + c
def hex_to_color_name(hex_color: str) -> str:
"""Return a friendly color name if one exists, otherwise return the hex string."""
normalized = hex_color.strip().lower()
return HEX_TO_NAME.get(normalized, hex_color)
# ---------------------------------------------------------------------------
# Wokwi → Velxio conversion
# ---------------------------------------------------------------------------
BOARD_PART_TYPES = set(WOKWI_TO_VELXIO_BOARD.keys())
def _detect_board_fqbn(parts: list[dict[str, Any]]) -> str:
"""Return the board FQBN inferred from the parts list."""
for part in parts:
part_type = part.get("type", "")
if part_type in WOKWI_TO_VELXIO_BOARD:
return WOKWI_TO_VELXIO_BOARD[part_type]
return DEFAULT_BOARD_FQBN
def parse_wokwi_diagram(diagram: dict[str, Any]) -> dict[str, Any]:
"""
Convert a Wokwi diagram.json payload into a Velxio circuit object.
The returned object has the shape:
{
"board_fqbn": str,
"components": [ { "id", "type", "left", "top", "rotate", "attrs" } ],
"connections": [ { "from_part", "from_pin", "to_part", "to_pin", "color" } ],
"version": int,
}
"""
parts: list[dict[str, Any]] = diagram.get("parts", [])
raw_connections: list[Any] = diagram.get("connections", [])
board_fqbn = _detect_board_fqbn(parts)
# Build component list (skip board parts they are implicit in board_fqbn)
components: list[dict[str, Any]] = []
for part in parts:
components.append(
{
"id": part.get("id", ""),
"type": part.get("type", ""),
"left": float(part.get("left", 0)),
"top": float(part.get("top", 0)),
"rotate": int(part.get("rotate", 0)),
"attrs": dict(part.get("attrs", {})),
}
)
# Build connection list
connections: list[dict[str, Any]] = []
for conn in raw_connections:
# Each connection is [from, to, color, segments]
if not isinstance(conn, (list, tuple)) or len(conn) < 2:
continue
from_endpoint: str = conn[0]
to_endpoint: str = conn[1]
color_raw: str = conn[2] if len(conn) > 2 else "green"
color = color_to_hex(color_raw)
# "partId:pinName" → split at first ':'
from_parts = from_endpoint.split(":", 1)
to_parts = to_endpoint.split(":", 1)
connections.append(
{
"from_part": from_parts[0],
"from_pin": from_parts[1] if len(from_parts) > 1 else "",
"to_part": to_parts[0],
"to_pin": to_parts[1] if len(to_parts) > 1 else "",
"color": color,
}
)
return {
"board_fqbn": board_fqbn,
"components": components,
"connections": connections,
"version": int(diagram.get("version", 1)),
}
# ---------------------------------------------------------------------------
# Velxio → Wokwi conversion
# ---------------------------------------------------------------------------
def format_wokwi_diagram(
circuit: dict[str, Any],
author: str = "velxio",
) -> dict[str, Any]:
"""
Convert a Velxio circuit object back into a Wokwi diagram.json payload.
"""
board_fqbn: str = circuit.get("board_fqbn", DEFAULT_BOARD_FQBN)
wokwi_board_type: str = VELXIO_TO_WOKWI_BOARD.get(board_fqbn, "wokwi-arduino-uno")
# Build parts list; inject board part first if not already present
raw_components: list[dict[str, Any]] = circuit.get("components", [])
board_present = any(c.get("type") == wokwi_board_type for c in raw_components)
parts: list[dict[str, Any]] = []
if not board_present:
parts.append(
{
"type": wokwi_board_type,
"id": "uno",
"top": 0,
"left": 0,
"attrs": {},
}
)
for comp in raw_components:
parts.append(
{
"type": comp.get("type", ""),
"id": comp.get("id", ""),
"top": comp.get("top", 0),
"left": comp.get("left", 0),
"rotate": comp.get("rotate", 0),
"attrs": comp.get("attrs", {}),
}
)
# Build connections list [ [from, to, color, []] ]
raw_connections: list[dict[str, Any]] = circuit.get("connections", [])
connections: list[list[Any]] = []
for conn in raw_connections:
from_endpoint = f"{conn.get('from_part', '')}:{conn.get('from_pin', '')}"
to_endpoint = f"{conn.get('to_part', '')}:{conn.get('to_pin', '')}"
color = hex_to_color_name(conn.get("color", "green"))
connections.append([from_endpoint, to_endpoint, color, []])
return {
"version": int(circuit.get("version", 1)),
"author": author,
"editor": "velxio",
"parts": parts,
"connections": connections,
}
# ---------------------------------------------------------------------------
# Code template generation
# ---------------------------------------------------------------------------
COMPONENT_PIN_TEMPLATES: dict[str, str] = {
"wokwi-led": "pinMode({pin}, OUTPUT); // LED",
"wokwi-pushbutton": "pinMode({pin}, INPUT_PULLUP); // Button",
"wokwi-buzzer": "pinMode({pin}, OUTPUT); // Buzzer",
"wokwi-servo": "// Servo on pin {pin}: use Servo library",
}
def generate_arduino_sketch(circuit: dict[str, Any], sketch_name: str = "sketch") -> str:
"""
Generate a minimal Arduino sketch (.ino) from a Velxio circuit definition.
Returns the .ino file content as a string.
"""
components = circuit.get("components", [])
connections = circuit.get("connections", [])
# Determine unique pins referenced by connections that involve the board
board_pin_map: dict[str, list[str]] = {} # component_id -> [pin_names]
board_part_ids = {
c["id"] for c in components if c.get("type", "") in BOARD_PART_TYPES
}
for conn in connections:
for side in ("from", "to"):
part = conn.get(f"{side}_part", "")
pin = conn.get(f"{side}_pin", "")
if part in board_part_ids and pin:
other_side = "to" if side == "from" else "from"
other_part = conn.get(f"{other_side}_part", "")
board_pin_map.setdefault(other_part, []).append(pin)
# Build setup/loop
includes: list[str] = []
setup_lines: list[str] = ["void setup() {", " Serial.begin(9600);"]
loop_lines: list[str] = ["void loop() {", " // TODO: add your logic here"]
for comp in components:
comp_id = comp.get("id", "")
comp_type = comp.get("type", "")
if comp_id in board_pin_map:
for pin in board_pin_map[comp_id]:
template = COMPONENT_PIN_TEMPLATES.get(comp_type)
if template:
setup_lines.append(f" {template.format(pin=pin)}")
setup_lines.append("}")
loop_lines.append("}")
lines = includes + [""] + setup_lines + [""] + loop_lines + [""]
return "\n".join(lines)

31
backend/mcp_server.py Normal file
View File

@ -0,0 +1,31 @@
"""
Velxio MCP Server stdio entry point
Run this script to start the MCP server in stdio mode, which is compatible
with Claude Desktop and other stdio-based MCP clients.
Usage:
python mcp_server.py
Claude Desktop configuration (~/.claude/claude_desktop_config.json):
{
"mcpServers": {
"velxio": {
"command": "python",
"args": ["/path/to/velxio/backend/mcp_server.py"],
"env": {}
}
}
}
"""
import sys
from pathlib import Path
# Ensure the backend package is importable when run as a script
sys.path.insert(0, str(Path(__file__).parent))
from app.mcp.server import mcp # noqa: E402
if __name__ == "__main__":
mcp.run(transport="stdio")

46
backend/mcp_sse_server.py Normal file
View File

@ -0,0 +1,46 @@
"""
Velxio MCP Server HTTP/SSE entry point
Run this script to start the MCP server using the SSE transport, which is
compatible with HTTP-based MCP clients (e.g. web applications, Cursor IDE).
The SSE server runs on port 8002 by default (separate from the main FastAPI
backend on port 8001) to avoid Starlette version conflicts.
Usage:
python mcp_sse_server.py [--port 8002] [--host 0.0.0.0]
MCP client configuration (SSE transport):
{
"mcpServers": {
"velxio": {
"url": "http://localhost:8002/sse"
}
}
}
"""
import sys
import argparse
from pathlib import Path
import uvicorn
# Ensure the backend package is importable when run as a script
sys.path.insert(0, str(Path(__file__).parent))
from app.mcp.server import mcp # noqa: E402
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Velxio MCP SSE Server")
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8002, help="Port to listen on")
args = parser.parse_args()
print(f"Starting Velxio MCP SSE server on {args.host}:{args.port}")
print(f"SSE endpoint: http://{args.host}:{args.port}/sse")
print(f"Tools: compile_project, run_project, import_wokwi_json, export_wokwi_json,")
print(f" create_circuit, update_circuit, generate_code_files")
sse_app = mcp.sse_app()
uvicorn.run(sse_app, host=args.host, port=args.port)

View File

@ -11,3 +11,4 @@ bcrypt==4.0.1
httpx==0.27.0
authlib==1.3.1
email-validator==2.2.0
mcp>=1.0.0

View File

View File

@ -0,0 +1,416 @@
"""
Tests for the Velxio MCP server tools.
These tests exercise the tool functions directly (without a running MCP transport)
and mock the ArduinoCLIService to avoid requiring arduino-cli to be installed.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import AsyncMock, patch
import pytest
# Ensure the backend package is importable when running from the repo root
sys.path.insert(0, str(Path(__file__).parent.parent))
# ---------------------------------------------------------------------------
# Wokwi utility tests
# ---------------------------------------------------------------------------
from app.mcp.wokwi import (
color_to_hex,
format_wokwi_diagram,
generate_arduino_sketch,
hex_to_color_name,
parse_wokwi_diagram,
)
class TestColorHelpers:
def test_named_color_to_hex(self):
assert color_to_hex("red") == "#ff0000"
assert color_to_hex("GREEN") == "#00ff00"
def test_hex_passthrough(self):
assert color_to_hex("#1a2b3c") == "#1a2b3c"
def test_hex_to_name(self):
assert hex_to_color_name("#ff0000") == "red"
assert hex_to_color_name("#unknown") == "#unknown"
SAMPLE_DIAGRAM = {
"version": 1,
"author": "test",
"editor": "wokwi",
"parts": [
{"type": "wokwi-arduino-uno", "id": "uno", "top": 0, "left": 0, "attrs": {}},
{
"type": "wokwi-led",
"id": "led1",
"top": 100,
"left": 200,
"rotate": 0,
"attrs": {"color": "red"},
},
{
"type": "wokwi-resistor",
"id": "r1",
"top": 150,
"left": 200,
"attrs": {"value": "220"},
},
],
"connections": [
["uno:13", "led1:A", "green", []],
["led1:C", "r1:1", "black", []],
["r1:2", "uno:GND.1", "black", []],
],
}
class TestParseWokwiDiagram:
def test_detects_board_fqbn(self):
result = parse_wokwi_diagram(SAMPLE_DIAGRAM)
assert result["board_fqbn"] == "arduino:avr:uno"
def test_components_parsed(self):
result = parse_wokwi_diagram(SAMPLE_DIAGRAM)
ids = [c["id"] for c in result["components"]]
assert "uno" in ids
assert "led1" in ids
assert "r1" in ids
def test_connections_parsed(self):
result = parse_wokwi_diagram(SAMPLE_DIAGRAM)
conns = result["connections"]
assert len(conns) == 3
first = conns[0]
assert first["from_part"] == "uno"
assert first["from_pin"] == "13"
assert first["to_part"] == "led1"
assert first["to_pin"] == "A"
assert first["color"] == "#00ff00" # 'green' -> hex
def test_version(self):
result = parse_wokwi_diagram(SAMPLE_DIAGRAM)
assert result["version"] == 1
def test_empty_diagram(self):
result = parse_wokwi_diagram({})
assert result["board_fqbn"] == "arduino:avr:uno"
assert result["components"] == []
assert result["connections"] == []
def test_unknown_board_defaults(self):
diagram = {"parts": [{"type": "unknown-board", "id": "b1", "attrs": {}}]}
result = parse_wokwi_diagram(diagram)
assert result["board_fqbn"] == "arduino:avr:uno"
class TestFormatWokwiDiagram:
def test_round_trip(self):
"""parse → format should preserve the essential structure."""
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
wokwi = format_wokwi_diagram(circuit)
part_ids = [p["id"] for p in wokwi["parts"]]
assert "led1" in part_ids
assert "r1" in part_ids
def test_board_injected(self):
circuit = {
"board_fqbn": "arduino:avr:uno",
"components": [],
"connections": [],
"version": 1,
}
wokwi = format_wokwi_diagram(circuit)
types = [p["type"] for p in wokwi["parts"]]
assert "wokwi-arduino-uno" in types
def test_author_embedded(self):
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
wokwi = format_wokwi_diagram(circuit, author="alice")
assert wokwi["author"] == "alice"
def test_connections_formatted(self):
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
wokwi = format_wokwi_diagram(circuit)
assert len(wokwi["connections"]) == 3
# Each connection should be [from, to, color, []]
for conn in wokwi["connections"]:
assert isinstance(conn, list)
assert len(conn) == 4
class TestGenerateArduinoSketch:
def test_generates_setup_and_loop(self):
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
sketch = generate_arduino_sketch(circuit)
assert "void setup()" in sketch
assert "void loop()" in sketch
def test_contains_serial_begin(self):
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
sketch = generate_arduino_sketch(circuit)
assert "Serial.begin" in sketch
# ---------------------------------------------------------------------------
# MCP tool function tests
# ---------------------------------------------------------------------------
from app.mcp.server import (
compile_project,
create_circuit,
export_wokwi_json,
generate_code_files,
import_wokwi_json,
run_project,
update_circuit,
)
class TestImportWokwiJson:
@pytest.mark.asyncio
async def test_valid_diagram(self):
result = await import_wokwi_json(json.dumps(SAMPLE_DIAGRAM))
assert "board_fqbn" in result
assert "components" in result
assert "connections" in result
@pytest.mark.asyncio
async def test_invalid_json(self):
result = await import_wokwi_json("not-json{")
assert "error" in result
@pytest.mark.asyncio
async def test_not_object(self):
result = await import_wokwi_json("[1, 2, 3]")
assert "error" in result
class TestExportWokwiJson:
@pytest.mark.asyncio
async def test_valid_circuit(self):
circuit = parse_wokwi_diagram(SAMPLE_DIAGRAM)
result = await export_wokwi_json(circuit)
assert "parts" in result
assert "connections" in result
assert result["editor"] == "velxio"
@pytest.mark.asyncio
async def test_invalid_input(self):
result = await export_wokwi_json("not a dict") # type: ignore[arg-type]
assert "error" in result
class TestCreateCircuit:
@pytest.mark.asyncio
async def test_defaults(self):
result = await create_circuit()
assert result["board_fqbn"] == "arduino:avr:uno"
assert result["components"] == []
assert result["connections"] == []
@pytest.mark.asyncio
async def test_with_components(self):
result = await create_circuit(
board_fqbn="arduino:avr:uno",
components=[{"id": "led1", "type": "wokwi-led", "left": 100, "top": 50}],
connections=[
{
"from_part": "uno",
"from_pin": "13",
"to_part": "led1",
"to_pin": "A",
"color": "green",
}
],
)
assert len(result["components"]) == 1
assert result["components"][0]["id"] == "led1"
assert len(result["connections"]) == 1
class TestUpdateCircuit:
@pytest.mark.asyncio
async def test_add_component(self):
circuit = await create_circuit(
components=[{"id": "led1", "type": "wokwi-led"}]
)
updated = await update_circuit(
circuit=circuit,
add_components=[{"id": "btn1", "type": "wokwi-pushbutton"}],
)
ids = [c["id"] for c in updated["components"]]
assert "led1" in ids
assert "btn1" in ids
@pytest.mark.asyncio
async def test_remove_component(self):
circuit = await create_circuit(
components=[
{"id": "led1", "type": "wokwi-led"},
{"id": "r1", "type": "wokwi-resistor"},
]
)
updated = await update_circuit(
circuit=circuit,
remove_component_ids=["r1"],
)
ids = [c["id"] for c in updated["components"]]
assert "led1" in ids
assert "r1" not in ids
@pytest.mark.asyncio
async def test_change_board(self):
circuit = await create_circuit()
updated = await update_circuit(
circuit=circuit,
board_fqbn="rp2040:rp2040:rpipico",
)
assert updated["board_fqbn"] == "rp2040:rp2040:rpipico"
@pytest.mark.asyncio
async def test_add_and_remove_connections(self):
circuit = await create_circuit(
connections=[
{
"from_part": "uno",
"from_pin": "13",
"to_part": "led1",
"to_pin": "A",
}
]
)
updated = await update_circuit(
circuit=circuit,
add_connections=[
{
"from_part": "uno",
"from_pin": "12",
"to_part": "led2",
"to_pin": "A",
}
],
remove_connections=[
{
"from_part": "uno",
"from_pin": "13",
"to_part": "led1",
"to_pin": "A",
}
],
)
assert len(updated["connections"]) == 1
assert updated["connections"][0]["from_pin"] == "12"
@pytest.mark.asyncio
async def test_invalid_circuit(self):
result = await update_circuit(circuit="bad") # type: ignore[arg-type]
assert "error" in result
class TestGenerateCodeFiles:
@pytest.mark.asyncio
async def test_generates_ino_file(self):
circuit = await create_circuit()
result = await generate_code_files(circuit=circuit)
assert len(result["files"]) == 1
assert result["files"][0]["name"] == "sketch.ino"
@pytest.mark.asyncio
async def test_custom_sketch_name(self):
circuit = await create_circuit()
result = await generate_code_files(circuit=circuit, sketch_name="blink")
assert result["files"][0]["name"] == "blink.ino"
@pytest.mark.asyncio
async def test_extra_instructions(self):
circuit = await create_circuit()
result = await generate_code_files(
circuit=circuit,
extra_instructions="Generated by agent",
)
assert "Generated by agent" in result["files"][0]["content"]
@pytest.mark.asyncio
async def test_invalid_circuit(self):
result = await generate_code_files(circuit="bad") # type: ignore[arg-type]
assert "error" in result
class TestCompileProject:
@pytest.mark.asyncio
async def test_missing_keys(self):
result = await compile_project(files=[{"name": "sketch.ino"}])
assert result["success"] is False
assert "content" in result["error"]
@pytest.mark.asyncio
async def test_calls_arduino_cli(self):
mock_result = {
"success": True,
"hex_content": ":00000001FF",
"binary_content": None,
"binary_type": None,
"stdout": "",
"stderr": "",
}
with patch(
"app.mcp.server._arduino.compile",
new=AsyncMock(return_value=mock_result),
):
result = await compile_project(
files=[{"name": "sketch.ino", "content": "void setup(){} void loop(){}"}]
)
assert result["success"] is True
assert result["hex_content"] is not None
class TestRunProject:
@pytest.mark.asyncio
async def test_simulation_ready_flag_on_success(self):
mock_result = {
"success": True,
"hex_content": ":00000001FF",
"binary_content": None,
"binary_type": None,
"stdout": "",
"stderr": "",
}
with patch(
"app.mcp.server._arduino.compile",
new=AsyncMock(return_value=mock_result),
):
result = await run_project(
files=[{"name": "sketch.ino", "content": "void setup(){} void loop(){}"}]
)
assert result["simulation_ready"] is True
@pytest.mark.asyncio
async def test_simulation_ready_flag_on_failure(self):
mock_result = {
"success": False,
"hex_content": None,
"binary_content": None,
"binary_type": None,
"stdout": "",
"stderr": "error",
"error": "Compilation failed",
}
with patch(
"app.mcp.server._arduino.compile",
new=AsyncMock(return_value=mock_result),
):
result = await run_project(
files=[{"name": "sketch.ino", "content": "bad code{{{"}]
)
assert result["simulation_ready"] is False

300
doc/MCP.md Normal file
View File

@ -0,0 +1,300 @@
# Velxio MCP Server
Velxio exposes a [Model Context Protocol](https://modelcontextprotocol.io/) (MCP) server that allows AI agents (e.g. Claude, Cursor) to:
- **Create and update circuits** using a structured JSON format
- **Import and export** circuits in the Wokwi `diagram.json` format
- **Generate Arduino code** from circuit definitions
- **Compile projects** and receive structured results (hex/binary, logs)
---
## Tools
| Tool | Description |
|------|-------------|
| `compile_project` | Compile Arduino sketch files → Intel HEX / binary |
| `run_project` | Compile and mark artifact as simulation-ready |
| `import_wokwi_json` | Parse a Wokwi `diagram.json` → Velxio circuit |
| `export_wokwi_json` | Serialise a Velxio circuit → Wokwi `diagram.json` |
| `create_circuit` | Create a new circuit definition |
| `update_circuit` | Merge changes into an existing circuit |
| `generate_code_files` | Generate starter `.ino` code from a circuit |
---
## Transport Options
### 1. stdio (Claude Desktop / CLI agents)
Run the MCP server as a child process:
```bash
cd backend
python mcp_server.py
```
**Claude Desktop config** (`~/.claude/claude_desktop_config.json`):
```json
{
"mcpServers": {
"velxio": {
"command": "python",
"args": ["/absolute/path/to/velxio/backend/mcp_server.py"]
}
}
}
```
### 2. SSE / HTTP (web agents, Cursor IDE)
Run the MCP SSE server on a separate port (default: **8002**):
```bash
cd backend
python mcp_sse_server.py --port 8002
```
**MCP client config (SSE transport)**:
```json
{
"mcpServers": {
"velxio": {
"url": "http://localhost:8002/sse"
}
}
}
```
> **Note**: The SSE server runs separately from the main FastAPI backend (port 8001) to avoid Starlette version conflicts. Both can run simultaneously.
---
## Setup
1. **Install dependencies**:
```bash
cd backend
pip install -r requirements.txt
```
2. **Ensure arduino-cli is installed** (required for `compile_project` / `run_project`):
```bash
arduino-cli version
arduino-cli core update-index
arduino-cli core install arduino:avr
```
---
## Example Walkthroughs
### Example 1 — Blink LED (from scratch)
**Step 1** — Create a circuit:
```json
{
"tool": "create_circuit",
"arguments": {
"board_fqbn": "arduino:avr:uno",
"components": [
{ "id": "led1", "type": "wokwi-led", "left": 150, "top": 100, "attrs": { "color": "red" } },
{ "id": "r1", "type": "wokwi-resistor", "left": 150, "top": 180, "attrs": { "value": "220" } }
],
"connections": [
{ "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "green" },
{ "from_part": "led1", "from_pin": "C", "to_part": "r1", "to_pin": "1", "color": "black" },
{ "from_part": "r1", "from_pin": "2", "to_part": "uno", "to_pin": "GND.1", "color": "black" }
]
}
}
```
**Step 2** — Generate code:
```json
{
"tool": "generate_code_files",
"arguments": {
"circuit": "<result from Step 1>",
"sketch_name": "blink",
"extra_instructions": "Blink the red LED every 500ms"
}
}
```
**Step 3** — Compile the generated code (edit the sketch content as needed):
```json
{
"tool": "compile_project",
"arguments": {
"files": [
{
"name": "blink.ino",
"content": "void setup() { pinMode(13, OUTPUT); }\nvoid loop() { digitalWrite(13, HIGH); delay(500); digitalWrite(13, LOW); delay(500); }"
}
],
"board": "arduino:avr:uno"
}
}
```
**Response**:
```json
{
"success": true,
"hex_content": ":100000000C9434000C943E000C943E000C943E...",
"binary_content": null,
"binary_type": null,
"stdout": "",
"stderr": ""
}
```
---
### Example 2 — Import a Wokwi Circuit
**Import**:
```json
{
"tool": "import_wokwi_json",
"arguments": {
"diagram_json": "{\"version\":1,\"author\":\"example\",\"editor\":\"wokwi\",\"parts\":[{\"type\":\"wokwi-arduino-uno\",\"id\":\"uno\",\"top\":0,\"left\":0,\"attrs\":{}},{\"type\":\"wokwi-led\",\"id\":\"led1\",\"top\":100,\"left\":200,\"attrs\":{\"color\":\"green\"}}],\"connections\":[[\"uno:13\",\"led1:A\",\"green\",[]]]}"
}
}
```
**Response**:
```json
{
"board_fqbn": "arduino:avr:uno",
"components": [
{ "id": "uno", "type": "wokwi-arduino-uno", "left": 0, "top": 0, "rotate": 0, "attrs": {} },
{ "id": "led1", "type": "wokwi-led", "left": 200, "top": 100, "rotate": 0, "attrs": { "color": "green" } }
],
"connections": [
{ "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "#00ff00" }
],
"version": 1
}
```
---
### Example 3 — Export to Wokwi Format
```json
{
"tool": "export_wokwi_json",
"arguments": {
"circuit": {
"board_fqbn": "arduino:avr:uno",
"components": [
{ "id": "led1", "type": "wokwi-led", "left": 200, "top": 100, "rotate": 0, "attrs": {} }
],
"connections": [
{ "from_part": "uno", "from_pin": "13", "to_part": "led1", "to_pin": "A", "color": "green" }
],
"version": 1
},
"author": "my-agent"
}
}
```
**Response** (Wokwi diagram.json format):
```json
{
"version": 1,
"author": "my-agent",
"editor": "velxio",
"parts": [
{ "type": "wokwi-arduino-uno", "id": "uno", "top": 0, "left": 0, "attrs": {} },
{ "type": "wokwi-led", "id": "led1", "top": 100, "left": 200, "rotate": 0, "attrs": {} }
],
"connections": [
["uno:13", "led1:A", "green", []]
]
}
```
---
## Circuit Data Format
Velxio circuits are plain JSON objects:
```json
{
"board_fqbn": "arduino:avr:uno",
"version": 1,
"components": [
{
"id": "led1",
"type": "wokwi-led",
"left": 200,
"top": 100,
"rotate": 0,
"attrs": { "color": "red" }
}
],
"connections": [
{
"from_part": "uno",
"from_pin": "13",
"to_part": "led1",
"to_pin": "A",
"color": "green"
}
]
}
```
### Supported Board FQBNs
| Board | FQBN |
|-------|------|
| Arduino Uno | `arduino:avr:uno` |
| Arduino Mega | `arduino:avr:mega` |
| Arduino Nano | `arduino:avr:nano` |
| Raspberry Pi Pico | `rp2040:rp2040:rpipico` |
| ESP32 DevKit | `esp32:esp32:esp32` |
### Common Component Types (Wokwi element names)
- `wokwi-led` — LED (attrs: `color`)
- `wokwi-resistor` — Resistor (attrs: `value` in Ω)
- `wokwi-pushbutton` — Push button
- `wokwi-buzzer` — Passive buzzer
- `wokwi-servo` — Servo motor
- `wokwi-lcd1602` — 16×2 LCD display
- `wokwi-neopixel` — NeoPixel RGB LED
---
## Sandboxing & Limits
- Compilation runs in a **temporary directory** cleaned up after each call.
- arduino-cli is invoked as a **subprocess** with no elevated privileges.
- There is no explicit CPU/memory timeout in the default configuration. For production deployments, set `COMPILATION_TIMEOUT_SECONDS` in the environment and enforce it at the process level.
---
## Running Tests
```bash
cd backend
pip install pytest pytest-asyncio
python -m pytest tests/test_mcp_tools.py -v
```