import subprocess import tempfile import asyncio import base64 from pathlib import Path class ArduinoCLIService: # Board manager URLs for cores that aren't built-in CORE_URLS: dict[str, str] = { "rp2040:rp2040": "https://github.com/earlephilhower/arduino-pico/releases/download/global/package_rp2040_index.json", "esp32:esp32": "https://espressif.github.io/arduino-esp32/package_esp32_index.json", } # Cores to auto-install on startup REQUIRED_CORES = ["arduino:avr"] # Cores to install on-demand when a board FQBN is requested ON_DEMAND_CORES: dict[str, str] = { "rp2040": "rp2040:rp2040", "mbed_rp2040": "arduino:mbed_rp2040", "esp32": "esp32:esp32", } def __init__(self, cli_path: str = "arduino-cli"): self.cli_path = cli_path self._ensure_board_urls() self._ensure_core_installed() def _ensure_board_urls(self): """Register additional board-manager URLs in arduino-cli config.""" try: # Ensure config file exists (arduino-cli requires it for config add) result = subprocess.run( [self.cli_path, "config", "dump", "--format", "json"], capture_output=True, text=True ) import json try: cfg = json.loads(result.stdout) except Exception: cfg = {} # If config is empty/missing, initialize it config_dict = cfg.get("config", cfg) if not config_dict or config_dict == {}: print("[arduino-cli] Initializing config file...") subprocess.run( [self.cli_path, "config", "init", "--overwrite"], capture_output=True, text=True ) # Re-read after init result = subprocess.run( [self.cli_path, "config", "dump", "--format", "json"], capture_output=True, text=True ) try: cfg = json.loads(result.stdout) except Exception: cfg = {} existing = set() # Handle both flat and nested config shapes config_dict = cfg.get("config", cfg) bm = config_dict.get("board_manager", config_dict) urls = bm.get("additional_urls", []) if isinstance(urls, str): existing.add(urls) elif isinstance(urls, list): existing.update(urls) for url in self.CORE_URLS.values(): if url not in existing: print(f"[arduino-cli] Adding board manager URL: {url}") subprocess.run( [self.cli_path, "config", "add", "board_manager.additional_urls", url], capture_output=True, text=True ) # Refresh index so new cores are discoverable print("[arduino-cli] Updating core index...") subprocess.run( [self.cli_path, "core", "update-index"], capture_output=True, text=True ) except Exception as e: print(f"Warning: Could not configure board URLs: {e}") def _ensure_core_installed(self): """ Ensure essential cores (arduino:avr) are installed at startup. Other cores (RP2040, ESP32) are installed on-demand. """ try: result = subprocess.run( [self.cli_path, "core", "list"], capture_output=True, text=True ) for core_id in self.REQUIRED_CORES: if core_id not in result.stdout: print(f"[arduino-cli] Core {core_id} not installed. Installing...") subprocess.run( [self.cli_path, "core", "install", core_id], check=True ) print(f"[arduino-cli] Core {core_id} installed successfully") except Exception as e: print(f"Warning: Could not verify cores: {e}") print("Please ensure arduino-cli is installed and in PATH") def _core_id_for_fqbn(self, fqbn: str) -> str | None: """Extract the core ID needed for a given FQBN.""" for prefix, core_id in self.ON_DEMAND_CORES.items(): if prefix in fqbn: return core_id return None def _is_core_installed(self, core_id: str) -> bool: """Check whether a core is currently installed.""" result = subprocess.run( [self.cli_path, "core", "list"], capture_output=True, text=True ) return core_id in result.stdout async def ensure_core_for_board(self, fqbn: str) -> dict: """ Auto-install the core required by a board FQBN if not present. Returns status dict with install log. """ core_id = self._core_id_for_fqbn(fqbn) if core_id is None: # Built-in core (arduino:avr) — should already be there return {"needed": False, "installed": True, "core_id": None, "log": ""} if self._is_core_installed(core_id): return {"needed": False, "installed": True, "core_id": core_id, "log": ""} # Install the core print(f"[arduino-cli] Auto-installing core {core_id} for board {fqbn}...") def _install(): return subprocess.run( [self.cli_path, "core", "install", core_id], capture_output=True, text=True ) result = await asyncio.to_thread(_install) log = result.stdout + "\n" + result.stderr if result.returncode == 0: print(f"[arduino-cli] Core {core_id} installed successfully") return {"needed": True, "installed": True, "core_id": core_id, "log": log.strip()} else: print(f"[arduino-cli] Failed to install core {core_id}: {result.stderr}") return {"needed": True, "installed": False, "core_id": core_id, "log": log.strip()} async def get_setup_status(self) -> dict: """Return the current state of arduino-cli and installed cores.""" try: version_result = subprocess.run( [self.cli_path, "version"], capture_output=True, text=True ) version = version_result.stdout.strip() if version_result.returncode == 0 else "unknown" list_result = subprocess.run( [self.cli_path, "core", "list"], capture_output=True, text=True ) cores_raw = list_result.stdout.strip() except FileNotFoundError: return { "cli_available": False, "version": None, "cores": [], "error": "arduino-cli not found in PATH" } except Exception as e: return { "cli_available": False, "version": None, "cores": [], "error": str(e) } # Parse installed cores cores = [] for line in cores_raw.splitlines()[1:]: parts = line.split() if len(parts) >= 3: cores.append({"id": parts[0], "installed": parts[1], "latest": parts[2]}) return { "cli_available": True, "version": version, "cores": cores, "error": None } def _is_rp2040_board(self, fqbn: str) -> bool: """Return True if the FQBN targets an RP2040/RP2350 board.""" return any(p in fqbn for p in ("rp2040", "rp2350", "mbed_rp2040", "mbed_rp2350")) async def compile(self, files: list[dict], board_fqbn: str = "arduino:avr:uno") -> dict: """ Compile Arduino sketch using arduino-cli. `files` is a list of {"name": str, "content": str} dicts. arduino-cli requires the sketch directory to contain a .ino file whose name matches the directory ("sketch"). If none exists we promote the first .ino file to sketch.ino automatically. Returns: dict with keys: success, hex_content, stdout, stderr, error """ print(f"\n=== Starting compilation ===") print(f"Board: {board_fqbn}") print(f"Files: {[f['name'] for f in files]}") # Create temporary directory for sketch with tempfile.TemporaryDirectory() as temp_dir: sketch_dir = Path(temp_dir) / "sketch" sketch_dir.mkdir() # Determine whether the caller already provides a "sketch.ino" has_sketch_ino = any(f["name"] == "sketch.ino" for f in files) main_ino_written = False for file_entry in files: name: str = file_entry["name"] content: str = file_entry["content"] # Promote the first .ino to sketch.ino if none explicitly named so write_name = name if not has_sketch_ino and name.endswith(".ino") and not main_ino_written: write_name = "sketch.ino" main_ino_written = True # RP2040: redirect Serial → Serial1 in the main sketch file only if "rp2040" in board_fqbn and write_name == "sketch.ino": content = "#define Serial Serial1\n" + content (sketch_dir / write_name).write_text(content, encoding="utf-8") # Fallback: no .ino files provided at all if not any(f["name"].endswith(".ino") for f in files): (sketch_dir / "sketch.ino").write_text("void setup(){}\nvoid loop(){}", encoding="utf-8") print(f"Sketch directory contents: {[p.name for p in sketch_dir.iterdir()]}") build_dir = sketch_dir / "build" build_dir.mkdir() print(f"Build directory: {build_dir}") try: # Run compilation using subprocess.run in a thread (Windows compatible) cmd = [ self.cli_path, "compile", "--fqbn", board_fqbn, "--output-dir", str(build_dir), str(sketch_dir) ] print(f"Running command: {' '.join(cmd)}") # Use subprocess.run in a thread for Windows compatibility def run_compile(): return subprocess.run( cmd, capture_output=True, text=True ) result = await asyncio.to_thread(run_compile) print(f"Process return code: {result.returncode}") print(f"Stdout: {result.stdout}") print(f"Stderr: {result.stderr}") if result.returncode == 0: print(f"Files in build dir: {list(build_dir.iterdir())}") if self._is_rp2040_board(board_fqbn): # RP2040 outputs a .bin file (and optionally .uf2) # Try .bin first (raw binary, simplest to load into emulator) bin_file = build_dir / "sketch.ino.bin" uf2_file = build_dir / "sketch.ino.uf2" target_file = bin_file if bin_file.exists() else (uf2_file if uf2_file.exists() else None) if target_file: raw_bytes = target_file.read_bytes() binary_b64 = base64.b64encode(raw_bytes).decode('ascii') print(f"[RP2040] Binary file: {target_file.name}, size: {len(raw_bytes)} bytes") print("=== RP2040 Compilation successful ===\n") return { "success": True, "hex_content": None, "binary_content": binary_b64, "binary_type": "bin" if target_file == bin_file else "uf2", "stdout": result.stdout, "stderr": result.stderr } else: print(f"[RP2040] Binary file not found. Files: {list(build_dir.iterdir())}") print("=== RP2040 Compilation failed: binary not found ===\n") return { "success": False, "error": "RP2040 binary (.bin/.uf2) not found after compilation", "stdout": result.stdout, "stderr": result.stderr } else: # AVR outputs a .hex file (Intel HEX format) hex_file = build_dir / "sketch.ino.hex" print(f"Looking for hex file at: {hex_file}") print(f"Hex file exists: {hex_file.exists()}") if hex_file.exists(): hex_content = hex_file.read_text() print(f"Hex file size: {len(hex_content)} bytes") print("=== AVR Compilation successful ===\n") return { "success": True, "hex_content": hex_content, "binary_content": None, "stdout": result.stdout, "stderr": result.stderr } else: print(f"Files in build dir: {list(build_dir.iterdir())}") print("=== Compilation failed: hex file not found ===\n") return { "success": False, "error": "Hex file not found after compilation", "stdout": result.stdout, "stderr": result.stderr } else: print("=== Compilation failed ===\n") return { "success": False, "error": "Compilation failed", "stdout": result.stdout, "stderr": result.stderr } except Exception as e: print(f"=== Exception during compilation: {e} ===\n") import traceback traceback.print_exc() return { "success": False, "error": str(e), "stdout": "", "stderr": "" } async def list_boards(self) -> list: """ List available Arduino boards """ try: process = await asyncio.create_subprocess_exec( self.cli_path, "board", "listall", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, _ = await process.communicate() # Parse output (format: "Board Name FQBN") boards = [] for line in stdout.decode().splitlines()[1:]: # Skip header if line.strip(): parts = line.split() if len(parts) >= 2: name = " ".join(parts[:-1]) fqbn = parts[-1] boards.append({"name": name, "fqbn": fqbn}) return boards except Exception as e: print(f"Error listing boards: {e}") return [] async def search_libraries(self, query: str) -> dict: """ Search for Arduino libraries """ try: def _run(): return subprocess.run( [self.cli_path, "lib", "search", query, "--format", "json"], capture_output=True, text=True, encoding='utf-8', errors='replace' ) result = await asyncio.to_thread(_run) stdout, stderr = result.stdout, result.stderr if result.returncode != 0: print(f"Error searching libraries: {stderr}") return {"success": False, "error": stderr} import json try: results = json.loads(stdout) libraries = results.get("libraries", []) # arduino-cli search returns each lib with a "releases" dict. # Inject a "latest" key with the data of the highest version so the # frontend can access lib.latest.version / author / sentence directly. def _parse_version(v: str): try: return tuple(int(x) for x in v.split(".")) except Exception: return (0,) for lib in libraries: releases = lib.get("releases") or {} if releases: latest_key = max(releases.keys(), key=_parse_version) lib["latest"] = {**releases[latest_key], "version": latest_key} return {"success": True, "libraries": libraries} except json.JSONDecodeError: return {"success": False, "error": "Invalid output format from arduino-cli"} except Exception as e: print(f"Exception searching libraries: {e}") return {"success": False, "error": str(e)} async def install_library(self, library_name: str) -> dict: """ Install an Arduino library """ try: print(f"Installing library: {library_name}") def _run(): return subprocess.run( [self.cli_path, "lib", "install", library_name], capture_output=True, text=True, encoding='utf-8', errors='replace' ) result = await asyncio.to_thread(_run) if result.returncode == 0: print(f"Successfully installed {library_name}") return {"success": True, "stdout": result.stdout} else: print(f"Failed to install {library_name}: {result.stderr}") return {"success": False, "error": result.stderr, "stdout": result.stdout} except Exception as e: print(f"Exception installing library: {e}") return {"success": False, "error": str(e)} async def list_installed_libraries(self) -> dict: """ List all installed Arduino libraries """ try: def _run(): return subprocess.run( [self.cli_path, "lib", "list", "--format", "json"], capture_output=True, text=True, encoding='utf-8', errors='replace' ) result = await asyncio.to_thread(_run) stdout, stderr = result.stdout, result.stderr if result.returncode != 0: print(f"Error listing libraries: {stderr}") return {"success": False, "error": stderr} import json try: if not stdout.strip(): return {"success": True, "libraries": []} results = json.loads(stdout) # arduino-cli lib list --format json wraps results in "installed_libraries" if isinstance(results, list): libraries = results elif isinstance(results, dict): libraries = ( results.get("installed_libraries") or results.get("libraries") or [] ) else: libraries = [] return {"success": True, "libraries": libraries} except json.JSONDecodeError: return {"success": False, "error": "Invalid output format from arduino-cli"} except Exception as e: print(f"Exception listing libraries: {e}") return {"success": False, "error": str(e)}