velxio/backend/app/services/arduino_cli.py

511 lines
20 KiB
Python

import subprocess
import tempfile
import asyncio
import base64
from pathlib import Path
class ArduinoCLIService:
# Board manager URLs for cores that aren't built-in
CORE_URLS: dict[str, str] = {
"rp2040:rp2040": "https://github.com/earlephilhower/arduino-pico/releases/download/global/package_rp2040_index.json",
"esp32:esp32": "https://espressif.github.io/arduino-esp32/package_esp32_index.json",
}
# Cores to auto-install on startup
REQUIRED_CORES = ["arduino:avr"]
# Cores to install on-demand when a board FQBN is requested
ON_DEMAND_CORES: dict[str, str] = {
"rp2040": "rp2040:rp2040",
"mbed_rp2040": "arduino:mbed_rp2040",
"esp32": "esp32:esp32",
}
def __init__(self, cli_path: str = "arduino-cli"):
self.cli_path = cli_path
self._ensure_board_urls()
self._ensure_core_installed()
def _ensure_board_urls(self):
"""Register additional board-manager URLs in arduino-cli config."""
try:
# Ensure config file exists (arduino-cli requires it for config add)
result = subprocess.run(
[self.cli_path, "config", "dump", "--format", "json"],
capture_output=True, text=True
)
import json
try:
cfg = json.loads(result.stdout)
except Exception:
cfg = {}
# If config is empty/missing, initialize it
config_dict = cfg.get("config", cfg)
if not config_dict or config_dict == {}:
print("[arduino-cli] Initializing config file...")
subprocess.run(
[self.cli_path, "config", "init", "--overwrite"],
capture_output=True, text=True
)
# Re-read after init
result = subprocess.run(
[self.cli_path, "config", "dump", "--format", "json"],
capture_output=True, text=True
)
try:
cfg = json.loads(result.stdout)
except Exception:
cfg = {}
existing = set()
# Handle both flat and nested config shapes
config_dict = cfg.get("config", cfg)
bm = config_dict.get("board_manager", config_dict)
urls = bm.get("additional_urls", [])
if isinstance(urls, str):
existing.add(urls)
elif isinstance(urls, list):
existing.update(urls)
for url in self.CORE_URLS.values():
if url not in existing:
print(f"[arduino-cli] Adding board manager URL: {url}")
subprocess.run(
[self.cli_path, "config", "add", "board_manager.additional_urls", url],
capture_output=True, text=True
)
# Refresh index so new cores are discoverable
print("[arduino-cli] Updating core index...")
subprocess.run(
[self.cli_path, "core", "update-index"],
capture_output=True, text=True
)
except Exception as e:
print(f"Warning: Could not configure board URLs: {e}")
def _ensure_core_installed(self):
"""
Ensure essential cores (arduino:avr) are installed at startup.
Other cores (RP2040, ESP32) are installed on-demand.
"""
try:
result = subprocess.run(
[self.cli_path, "core", "list"],
capture_output=True,
text=True
)
for core_id in self.REQUIRED_CORES:
if core_id not in result.stdout:
print(f"[arduino-cli] Core {core_id} not installed. Installing...")
subprocess.run(
[self.cli_path, "core", "install", core_id],
check=True
)
print(f"[arduino-cli] Core {core_id} installed successfully")
except Exception as e:
print(f"Warning: Could not verify cores: {e}")
print("Please ensure arduino-cli is installed and in PATH")
def _core_id_for_fqbn(self, fqbn: str) -> str | None:
"""Extract the core ID needed for a given FQBN."""
for prefix, core_id in self.ON_DEMAND_CORES.items():
if prefix in fqbn:
return core_id
return None
def _is_core_installed(self, core_id: str) -> bool:
"""Check whether a core is currently installed."""
result = subprocess.run(
[self.cli_path, "core", "list"],
capture_output=True, text=True
)
return core_id in result.stdout
async def ensure_core_for_board(self, fqbn: str) -> dict:
"""
Auto-install the core required by a board FQBN if not present.
Returns status dict with install log.
"""
core_id = self._core_id_for_fqbn(fqbn)
if core_id is None:
# Built-in core (arduino:avr) — should already be there
return {"needed": False, "installed": True, "core_id": None, "log": ""}
if self._is_core_installed(core_id):
return {"needed": False, "installed": True, "core_id": core_id, "log": ""}
# Install the core
print(f"[arduino-cli] Auto-installing core {core_id} for board {fqbn}...")
def _install():
return subprocess.run(
[self.cli_path, "core", "install", core_id],
capture_output=True, text=True
)
result = await asyncio.to_thread(_install)
log = result.stdout + "\n" + result.stderr
if result.returncode == 0:
print(f"[arduino-cli] Core {core_id} installed successfully")
return {"needed": True, "installed": True, "core_id": core_id, "log": log.strip()}
else:
print(f"[arduino-cli] Failed to install core {core_id}: {result.stderr}")
return {"needed": True, "installed": False, "core_id": core_id, "log": log.strip()}
async def get_setup_status(self) -> dict:
"""Return the current state of arduino-cli and installed cores."""
try:
version_result = subprocess.run(
[self.cli_path, "version"],
capture_output=True, text=True
)
version = version_result.stdout.strip() if version_result.returncode == 0 else "unknown"
list_result = subprocess.run(
[self.cli_path, "core", "list"],
capture_output=True, text=True
)
cores_raw = list_result.stdout.strip()
except FileNotFoundError:
return {
"cli_available": False,
"version": None,
"cores": [],
"error": "arduino-cli not found in PATH"
}
except Exception as e:
return {
"cli_available": False,
"version": None,
"cores": [],
"error": str(e)
}
# Parse installed cores
cores = []
for line in cores_raw.splitlines()[1:]:
parts = line.split()
if len(parts) >= 3:
cores.append({"id": parts[0], "installed": parts[1], "latest": parts[2]})
return {
"cli_available": True,
"version": version,
"cores": cores,
"error": None
}
def _is_rp2040_board(self, fqbn: str) -> bool:
"""Return True if the FQBN targets an RP2040/RP2350 board."""
return any(p in fqbn for p in ("rp2040", "rp2350", "mbed_rp2040", "mbed_rp2350"))
async def compile(self, files: list[dict], board_fqbn: str = "arduino:avr:uno") -> dict:
"""
Compile Arduino sketch using arduino-cli.
`files` is a list of {"name": str, "content": str} dicts.
arduino-cli requires the sketch directory to contain a .ino file whose
name matches the directory ("sketch"). If none exists we promote the
first .ino file to sketch.ino automatically.
Returns:
dict with keys: success, hex_content, stdout, stderr, error
"""
print(f"\n=== Starting compilation ===")
print(f"Board: {board_fqbn}")
print(f"Files: {[f['name'] for f in files]}")
# Create temporary directory for sketch
with tempfile.TemporaryDirectory() as temp_dir:
sketch_dir = Path(temp_dir) / "sketch"
sketch_dir.mkdir()
# Determine whether the caller already provides a "sketch.ino"
has_sketch_ino = any(f["name"] == "sketch.ino" for f in files)
main_ino_written = False
for file_entry in files:
name: str = file_entry["name"]
content: str = file_entry["content"]
# Promote the first .ino to sketch.ino if none explicitly named so
write_name = name
if not has_sketch_ino and name.endswith(".ino") and not main_ino_written:
write_name = "sketch.ino"
main_ino_written = True
# RP2040: redirect Serial → Serial1 in the main sketch file only
if "rp2040" in board_fqbn and write_name == "sketch.ino":
content = "#define Serial Serial1\n" + content
(sketch_dir / write_name).write_text(content, encoding="utf-8")
# Fallback: no .ino files provided at all
if not any(f["name"].endswith(".ino") for f in files):
(sketch_dir / "sketch.ino").write_text("void setup(){}\nvoid loop(){}", encoding="utf-8")
print(f"Sketch directory contents: {[p.name for p in sketch_dir.iterdir()]}")
build_dir = sketch_dir / "build"
build_dir.mkdir()
print(f"Build directory: {build_dir}")
try:
# Run compilation using subprocess.run in a thread (Windows compatible)
cmd = [
self.cli_path,
"compile",
"--fqbn", board_fqbn,
"--output-dir", str(build_dir),
str(sketch_dir)
]
print(f"Running command: {' '.join(cmd)}")
# Use subprocess.run in a thread for Windows compatibility
def run_compile():
return subprocess.run(
cmd,
capture_output=True,
text=True
)
result = await asyncio.to_thread(run_compile)
print(f"Process return code: {result.returncode}")
print(f"Stdout: {result.stdout}")
print(f"Stderr: {result.stderr}")
if result.returncode == 0:
print(f"Files in build dir: {list(build_dir.iterdir())}")
if self._is_rp2040_board(board_fqbn):
# RP2040 outputs a .bin file (and optionally .uf2)
# Try .bin first (raw binary, simplest to load into emulator)
bin_file = build_dir / "sketch.ino.bin"
uf2_file = build_dir / "sketch.ino.uf2"
target_file = bin_file if bin_file.exists() else (uf2_file if uf2_file.exists() else None)
if target_file:
raw_bytes = target_file.read_bytes()
binary_b64 = base64.b64encode(raw_bytes).decode('ascii')
print(f"[RP2040] Binary file: {target_file.name}, size: {len(raw_bytes)} bytes")
print("=== RP2040 Compilation successful ===\n")
return {
"success": True,
"hex_content": None,
"binary_content": binary_b64,
"binary_type": "bin" if target_file == bin_file else "uf2",
"stdout": result.stdout,
"stderr": result.stderr
}
else:
print(f"[RP2040] Binary file not found. Files: {list(build_dir.iterdir())}")
print("=== RP2040 Compilation failed: binary not found ===\n")
return {
"success": False,
"error": "RP2040 binary (.bin/.uf2) not found after compilation",
"stdout": result.stdout,
"stderr": result.stderr
}
else:
# AVR outputs a .hex file (Intel HEX format)
hex_file = build_dir / "sketch.ino.hex"
print(f"Looking for hex file at: {hex_file}")
print(f"Hex file exists: {hex_file.exists()}")
if hex_file.exists():
hex_content = hex_file.read_text()
print(f"Hex file size: {len(hex_content)} bytes")
print("=== AVR Compilation successful ===\n")
return {
"success": True,
"hex_content": hex_content,
"binary_content": None,
"stdout": result.stdout,
"stderr": result.stderr
}
else:
print(f"Files in build dir: {list(build_dir.iterdir())}")
print("=== Compilation failed: hex file not found ===\n")
return {
"success": False,
"error": "Hex file not found after compilation",
"stdout": result.stdout,
"stderr": result.stderr
}
else:
print("=== Compilation failed ===\n")
return {
"success": False,
"error": "Compilation failed",
"stdout": result.stdout,
"stderr": result.stderr
}
except Exception as e:
print(f"=== Exception during compilation: {e} ===\n")
import traceback
traceback.print_exc()
return {
"success": False,
"error": str(e),
"stdout": "",
"stderr": ""
}
async def list_boards(self) -> list:
"""
List available Arduino boards
"""
try:
process = await asyncio.create_subprocess_exec(
self.cli_path,
"board",
"listall",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, _ = await process.communicate()
# Parse output (format: "Board Name FQBN")
boards = []
for line in stdout.decode().splitlines()[1:]: # Skip header
if line.strip():
parts = line.split()
if len(parts) >= 2:
name = " ".join(parts[:-1])
fqbn = parts[-1]
boards.append({"name": name, "fqbn": fqbn})
return boards
except Exception as e:
print(f"Error listing boards: {e}")
return []
async def search_libraries(self, query: str) -> dict:
"""
Search for Arduino libraries
"""
try:
def _run():
return subprocess.run(
[self.cli_path, "lib", "search", query, "--format", "json"],
capture_output=True, text=True, encoding='utf-8', errors='replace'
)
result = await asyncio.to_thread(_run)
stdout, stderr = result.stdout, result.stderr
if result.returncode != 0:
print(f"Error searching libraries: {stderr}")
return {"success": False, "error": stderr}
import json
try:
results = json.loads(stdout)
libraries = results.get("libraries", [])
# arduino-cli search returns each lib with a "releases" dict.
# Inject a "latest" key with the data of the highest version so the
# frontend can access lib.latest.version / author / sentence directly.
def _parse_version(v: str):
try:
return tuple(int(x) for x in v.split("."))
except Exception:
return (0,)
for lib in libraries:
releases = lib.get("releases") or {}
if releases:
latest_key = max(releases.keys(), key=_parse_version)
lib["latest"] = {**releases[latest_key], "version": latest_key}
return {"success": True, "libraries": libraries}
except json.JSONDecodeError:
return {"success": False, "error": "Invalid output format from arduino-cli"}
except Exception as e:
print(f"Exception searching libraries: {e}")
return {"success": False, "error": str(e)}
async def install_library(self, library_name: str) -> dict:
"""
Install an Arduino library
"""
try:
print(f"Installing library: {library_name}")
def _run():
return subprocess.run(
[self.cli_path, "lib", "install", library_name],
capture_output=True, text=True, encoding='utf-8', errors='replace'
)
result = await asyncio.to_thread(_run)
if result.returncode == 0:
print(f"Successfully installed {library_name}")
return {"success": True, "stdout": result.stdout}
else:
print(f"Failed to install {library_name}: {result.stderr}")
return {"success": False, "error": result.stderr, "stdout": result.stdout}
except Exception as e:
print(f"Exception installing library: {e}")
return {"success": False, "error": str(e)}
async def list_installed_libraries(self) -> dict:
"""
List all installed Arduino libraries
"""
try:
def _run():
return subprocess.run(
[self.cli_path, "lib", "list", "--format", "json"],
capture_output=True, text=True, encoding='utf-8', errors='replace'
)
result = await asyncio.to_thread(_run)
stdout, stderr = result.stdout, result.stderr
if result.returncode != 0:
print(f"Error listing libraries: {stderr}")
return {"success": False, "error": stderr}
import json
try:
if not stdout.strip():
return {"success": True, "libraries": []}
results = json.loads(stdout)
# arduino-cli lib list --format json wraps results in "installed_libraries"
if isinstance(results, list):
libraries = results
elif isinstance(results, dict):
libraries = (
results.get("installed_libraries")
or results.get("libraries")
or []
)
else:
libraries = []
return {"success": True, "libraries": libraries}
except json.JSONDecodeError:
return {"success": False, "error": "Invalid output format from arduino-cli"}
except Exception as e:
print(f"Exception listing libraries: {e}")
return {"success": False, "error": str(e)}