Plugin Development
MeshDash plugins extend the application with new API endpoints, background tasks, UI pages, and direct access to live mesh data — all without modifying core files. Plugins are loaded at startup from the /plugins/ directory and receive full access to the database, the in-memory node store, the radio connection manager, and the asyncio event loop.
<install_dir>/plugins/. MeshDash creates it automatically if it doesn't exist.Plugin Directory Structure
plugins/
└── my_plugin/
├── manifest.json ← required
├── main.py ← Python entry point
├── setup.py ← optional one-time setup script
├── .setup_complete ← sentinel written after successful setup
├── .disabled ← if present, plugin is skipped entirely
├── requirements.txt ← documents your dependencies (informational)
├── config.json ← your plugin's persistent settings (optional)
├── data.db ← your plugin's own SQLite DB (optional)
├── static/ ← files served at /static/plugins/my_plugin/
│ ├── index.html ← your UI page
│ ├── app.js ← your frontend JS
│ ├── style.css ← your custom CSS
│ └── bridge.html ← background iframe page (optional)
└── lib/ ← extra Python modules (optional)
└── helpers.py
The directory name is the plugin's fallback ID. The canonical ID is the id field in manifest.json — they must match exactly.
manifest.json — Full Reference
{
"id": "my_plugin",
"name": "My Plugin",
"version": "1.0.0",
"description": "Does something useful.",
"watchdog": false,
"entry_point": "main.py",
"router_prefix": "/api/plugins/my_plugin",
"static_prefix": "/static/plugins/my_plugin",
"bridge": "bridge.html",
"nav_menu": [
{
"label": "My Plugin",
"href": "/plugin/my_plugin/index.html",
"icon": "fa-star"
}
]
}
id[a-zA-Z0-9_-] characters. Must match the plugin directory name exactly — a mismatch causes the plugin to load under an unexpected ID, breaking all route prefixes and watchdog keys.watchdogtrue if the plugin runs a persistent background thread that must heartbeat every 120 seconds. Set false for passive plugins (pure API, no background loop). Omission is treated as an authoring error by design — there is no default.entry_pointmain.py. The module is imported in a dedicated thread with a 10-second hard timeout. Any blocking call at module scope (network I/O, time.sleep, heavy computation) will cause import failure. Keep module-scope code to bare declarations only.router_prefixplugin_router. Default: /api/plugins/{id}. A route decorated @plugin_router.get("/status") becomes GET /api/plugins/my_plugin/status.static_prefixstatic/ subdirectory. Default: /static/plugins/{id}. Files are served directly via StaticFiles — no Python code runs when they are fetched. /static/plugins/my_plugin/index.html maps to plugins/my_plugin/static/index.html.bridgestatic/ dir that MeshDash loads as a hidden background iframe via the PluginBridge system. Used for persistent JS context (WebSockets, polling, notifications) running silently while the user navigates. Must be an exact filename — no path separators. Only alphanumerics, underscores, hyphens, and dots are accepted.nav_menulabel, href, and icon (Font Awesome class without the fas prefix — just "fa-star"). Use /plugin/{id}/index.html to frame your page inside the MeshDash chrome with sidebar and theme. Use /static/plugins/{id}/index.html for a raw standalone page without chrome.Python Entry Point — What the Engine Looks For
Your entry point file is imported as a standard Python module. The plugin engine looks for exactly two optional names at module scope:
plugin_router— a FastAPIAPIRouterinstance. If present, it is mounted atrouter_prefixwith an automatic state-check dependency on every route that enforces the plugin's running/hung status.init_plugin(context)— a function called after import with the application context dict injected. This is the only place you should store service references and start threads. It also runs under a 15-second hard timeout.
Neither is mandatory — a plugin with only static/ files and no Python is valid.
time.sleep, no heavy data loading, and no thread creation at module scope. All of that belongs inside init_plugin.One-Time Setup Routine — Installing Dependencies
Some plugins require Python packages that are not part of MeshDash's core environment — for example, Pillow for image generation, numpy for signal processing, or paho-mqtt for external MQTT bridging. The recommended pattern is a one-time setup script guarded by a sentinel file so the installation only ever runs once.
How the Sentinel Pattern Works
Your plugin ships a setup.py file alongside main.py. Inside init_plugin, before doing anything else, you check whether a .setup_complete file exists in the plugin directory. If it does not, you run setup.py as a subprocess, wait for it to finish, and the script itself writes the sentinel on success. On every subsequent startup the sentinel is found immediately and setup is skipped entirely.
# plugins/my_plugin/setup.py
"""
One-time setup script for my_plugin.
Executed by main.py as a subprocess on first startup only.
DO NOT import this file — it is run as __main__.
"""
import subprocess
import sys
import os
PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__))
def run():
packages = [
"Pillow>=10.0.0",
"numpy>=1.24.0",
]
print(f"[my_plugin setup] Installing {len(packages)} package(s)...")
for pkg in packages:
print(f"[my_plugin setup] Installing: {pkg}")
result = subprocess.run(
[sys.executable, "-m", "pip", "install", pkg, "--quiet"],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"[my_plugin setup] FAILED: {pkg}")
print(result.stderr)
sys.exit(1) # non-zero exit signals failure to main.py
print(f"[my_plugin setup] OK: {pkg}")
# Write sentinel last — only if everything above succeeded
sentinel = os.path.join(PLUGIN_DIR, ".setup_complete")
with open(sentinel, "w") as f:
f.write("1")
print("[my_plugin setup] Setup complete.")
if __name__ == "__main__":
run()
# plugins/my_plugin/main.py
import os
import sys
import subprocess
import threading
import time
from fastapi import APIRouter
PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__))
SENTINEL = os.path.join(PLUGIN_DIR, ".setup_complete")
SETUP_SCRIPT = os.path.join(PLUGIN_DIR, "setup.py")
plugin_router = APIRouter()
_logger = None
_setup_ok = False
def _run_setup(logger):
"""Run setup.py as a subprocess and return True on success."""
if not os.path.exists(SETUP_SCRIPT):
return True # no setup script — nothing to do
logger.info("First run detected — executing setup.py ...")
try:
result = subprocess.run(
[sys.executable, SETUP_SCRIPT],
capture_output=True,
text=True,
timeout=300 # 5-minute hard cap for pip installs
)
for line in result.stdout.splitlines():
logger.info(line)
if result.returncode != 0:
for line in result.stderr.splitlines():
logger.error(line)
logger.error("setup.py exited with code %d — plugin cannot start",
result.returncode)
return False
logger.info("setup.py completed successfully")
return True
except subprocess.TimeoutExpired:
logger.error("setup.py timed out after 300 seconds")
return False
except Exception as e:
logger.error("setup.py failed with exception: %s", e)
return False
@plugin_router.get("/status")
async def status():
return {"setup_ok": _setup_ok}
def init_plugin(context: dict):
global _logger, _setup_ok
_logger = context["logger"]
# ── Step 1: Run one-time setup if sentinel is absent ─────────────────────
if not os.path.exists(SENTINEL):
ok = _run_setup(_logger)
if not ok:
# Raising here marks the plugin as crashed — routes will return 503
raise RuntimeError("Plugin setup failed. Check plugin logs for details.")
else:
_logger.info("Setup sentinel found — skipping setup")
# ── Step 2: Import packages that setup.py installed ──────────────────────
# IMPORTANT: always import these here, never at module scope.
# At module scope the import runs before init_plugin, before setup has run.
try:
from PIL import Image
import numpy as np
_setup_ok = True
_logger.info("my_plugin ready — PIL and numpy available")
except ImportError as e:
raise RuntimeError(f"Required package missing after setup: {e}")
_logger.info("my_plugin initialised")
pip install as a subprocess (rather than calling pip internals directly) is the only reliable way to install packages into the same Python environment that MeshDash is running in, without side-effects on the current process. The sentinel file prevents the install from running again on every restart.init_plugin runs. If the package is not yet installed, the import will raise ImportError and crash the plugin. Always import them inside init_plugin, after confirming the sentinel exists.Resetting Setup
To force setup to run again — for example after adding a new dependency in a plugin update — delete the sentinel and restart MeshDash:
rm plugins/my_plugin/.setup_complete
Packages Already Available — No Setup Needed
The following are part of MeshDash's own environment and are always importable without any setup:
fastapi, uvicorn, pydantic, httpx, requests, starlette
sqlite3 (stdlib), asyncio (stdlib), threading (stdlib)
json, os, sys, time, datetime, pathlib, re, io, base64
logging, subprocess, uuid, secrets, shutil, zipfile, socket
typing, collections, statistics
meshtastic (and portnums_pb2, admin_pb2, channel_pb2, etc.)
pubsub (PyPubSub)
jose (python-jose — JWT)
passlib (bcrypt)
sse_starlette (Server-Sent Events)
bs4 (BeautifulSoup)
Anything not on this list needs to be installed by your setup.py.
init_plugin — Full Context Reference
The context dict passed to init_plugin(context) provides access to every major subsystem. All keys are always present regardless of connection state at the time your plugin loads.
context["db_manager"]DatabaseManager instance. Full read/write access to the MeshDash SQLite database: nodes, packets, messages, positions, telemetry, traceroutes, waypoints, neighbor info, connection log, and average metrics history. Thread-safe via WAL mode. See DatabaseManager API below.context["meshtastic_data"]MeshtasticData instance. The live in-memory store updated in real time as packets arrive: full node dict, packet deque, local node info, connection status, session stats, and channel map. See MeshtasticData API below.context["connection_manager"]MeshtasticConnectionManager. The live radio interface. Call await connection_manager.sendText() to transmit to the mesh. Check connection_manager.is_ready.is_set() before sending. Access connection_manager.interface for the raw Meshtastic interface object (nodes, localNode, requestNodes(), etc.).context["node_registry"]NODE_REGISTRY dict: slot_id → NodeSlot. Each NodeSlot has its own meshtastic_data, db_manager, connection_manager, packet_queue, and sse_queues. Single-radio setups contain only "node_0". See NodeSlot Reference below.context["event_loop"]asyncio.run_coroutine_threadsafe(coro, event_loop) to schedule async work safely from background threads. This is the only correct way to call sendText or any other coroutine from a thread.context["logger"]logging.Logger pre-configured as plugin.{id}. Lines appear in the Plugin Logs panel in the MeshDash UI and in the main server log. The in-memory buffer holds the last 250 lines, viewable at GET /api/system/plugins/{id}/logs.context["plugin_watchdog"]str → float. If "watchdog": true in your manifest, write context["plugin_watchdog"][pid] = time.time() inside your background loop at least once every 120 seconds. Failure to do so marks your plugin "hung" and all routes return 503.context["plugin_id"]id string as declared in manifest.json. Use as the watchdog key and in any log messages that need to identify the source plugin.DatabaseManager API — MeshDash Shared Database
context["db_manager"] connects to the MeshDash SQLite database with a thread-local connection pool (WAL mode, 30-second busy timeout). All built-in methods are safe to call from threads. From async route handlers, always wrap calls with await asyncio.to_thread(fn, args).
Schema — Tables Available to Plugins
nodesnode_id TEXT PK (e.g. "!aabbccdd"), node_num INT, long_name, short_name, macaddr, hw_model, firmware_version, role, is_local BOOL, last_heard INT (unix), battery_level INT, voltage REAL, channel_utilization REAL, air_util_tx REAL, snr REAL, rssi INT, latitude REAL, longitude REAL, altitude INT, position_time INT, telemetry_time INT, user_info JSON, position_info JSON, device_metrics_info JSON, environment_metrics_info JSON, updated_at DATETIME.packetsevent_id TEXT UNIQUE, timestamp REAL (unix), rx_time INT, from_id, to_id, channel INT, packet_type TEXT ("Message", "Position", "Telemetry", "Node Info", "Traceroute", "Ack", "Waypoint", etc.), rx_snr REAL, rx_rssi INT, hop_limit INT, hop_start INT, want_ack BOOL, decoded JSON text, raw JSON text, source TEXT ("RF"/"MQTT"/"UNKNOWN"), source_confidence REAL 0–1.messagespacket_event_id TEXT UNIQUE, mesh_packet_id INT, from_id, to_id, channel INT, text TEXT (NULL for encrypted), timestamp REAL, rx_snr REAL, rx_rssi INT, status TEXT ("DELIVERED", "BROADCAST", "ENCRYPTED", "FAILED").positionsnode_id, timestamp REAL, latitude REAL, longitude REAL, altitude INT, precision_bits INT, ground_speed INT, ground_track INT, sats_in_view INT, pdop/hdop/vdop REAL.telemetrynode_id, timestamp REAL, battery_level INT, voltage REAL, channel_utilization REAL, air_util_tx REAL, uptime_seconds INT, temperature REAL, relative_humidity REAL, barometric_pressure REAL, gas_resistance REAL, iaq REAL.traceroutesfrom_id, to_id, route_path JSON (route_to, route_back, snr_towards, snr_back, rssi, snr, hops_used), timestamp REAL.neighborsnode_id, neighbor_id), snr REAL, last_seen DATETIME.waypointsfrom_id, waypoint_id INT, name, latitude REAL, longitude REAL, description, timestamp REAL.hardware_logsnode_id, event_type, details JSON, timestamp REAL.connection_logtimestamp REAL, status TEXT, value REAL (0.1=disconnected, 0.5=reconnecting, 0.9=connected).average_metrics_historytimestamp REAL UNIQUE, average_snr REAL, average_rssi REAL, node_count INT. Written every 5 minutes.Built-in Read Methods
db = context["db_manager"]
nodes = db.get_all_nodes() # Dict[node_id, dict]
packets = db.get_recent_packets(limit=100) # List[dict], newest first
messages = db.get_messages(
from_id="!aabbccdd", # optional sender filter
to_id="^all", # optional recipient filter
channel=0, # optional channel filter
start=time.time()-3600, # optional unix timestamp lower bound
end=time.time(), # optional unix timestamp upper bound
limit=100
)
history = db.get_node_history(
node_id="!aabbccdd",
table="positions", # or "telemetry"
start=None, end=None, limit=1000
)
traceroutes = db.get_traceroutes(limit=100)
neighbors = db.get_neighbors(limit=500)
waypoints = db.get_waypoints(limit=500)
hw_logs = db.get_hardware_logs(limit=100)
conn_hist = db.get_connection_history(limit=100)
metrics = db.get_average_metrics_history(limit=100)
latest_avg = db.get_most_recent_average_metrics() # dict or None
count = db.count_node_items("!aabbccdd", "positions")
results = db.global_search("search term", limit=50)
Built-in Write Methods
db.save_node("!aabbccdd", {
"user": {"longName": "My Node", "shortName": "MN"},
"position": {"latitude": 51.5, "longitude": -0.1},
"deviceMetrics": {"batteryLevel": 85, "voltage": 3.9},
"lastHeard": int(time.time()),
})
db.log_hardware_event("!aabbccdd", "MY_EVENT", {"key": "val"}, time.time())
db.log_connection_status("Connected")
db.save_neighbors("!aabbccdd", [{"nodeId": 0x11223344, "snr": 8.5}])
db.save_traceroute("!aabbccdd", "!11223344", route_list=[0x55667788], timestamp=time.time())
Raw SQL and Plugin-Owned Tables
# Raw query — always get the connection on the current thread
def my_query(db_manager):
conn = db_manager._get_connection()
rows = conn.execute(
"SELECT node_id, long_name, snr FROM nodes WHERE last_heard > ? ORDER BY snr DESC",
(time.time() - 3600,)
).fetchall()
return [dict(r) for r in rows]
# From async handler:
results = await asyncio.to_thread(my_query, context["db_manager"])
# Creating plugin-owned tables in the shared MeshDash DB
def init_tables(db_manager):
conn = db_manager._get_connection()
conn.execute("""
CREATE TABLE IF NOT EXISTS my_plugin_events (
id INTEGER PRIMARY KEY,
node_id TEXT,
event TEXT,
payload TEXT,
timestamp REAL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_mpe_ts ON my_plugin_events(timestamp DESC)")
conn.commit()
# Writing to your custom table
def insert_event(db_manager, node_id, event, payload_dict):
try:
conn = db_manager._get_connection()
conn.execute(
"INSERT INTO my_plugin_events (node_id, event, payload, timestamp) VALUES (?,?,?,?)",
(node_id, event, json.dumps(payload_dict), time.time())
)
conn.commit()
except Exception as e:
logging.error("DB insert error: %s", e)
async def route handlers. DatabaseManager uses blocking SQLite I/O. Calling it inside an async function blocks the entire event loop and stalls all other requests. Always use await asyncio.to_thread(fn, args).Using Your Own Separate Database File
For complex schemas, large datasets, or plugins that need to be independently portable or deletable, use your own SQLite file stored inside the plugin directory. Build a small thread-safe manager class following the same pattern MeshDash uses internally:
# plugins/my_plugin/db.py — your plugin's private database
import sqlite3
import threading
import time
import json
import os
import logging
logger = logging.getLogger("plugin.my_plugin")
class PluginDB:
"""Thread-safe SQLite manager using thread-local connections."""
def __init__(self, db_path: str):
self.db_path = db_path
self._local = threading.local()
self._init_schema()
def _get_conn(self) -> sqlite3.Connection:
"""Return thread-local connection, creating it on first call per thread."""
conn = getattr(self._local, "conn", None)
if conn is not None:
try:
conn.execute("SELECT 1")
return conn
except Exception:
try: conn.close()
except Exception: pass
self._local.conn = None
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA busy_timeout=30000;")
self._local.conn = conn
return conn
def _init_schema(self):
conn = self._get_conn()
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
node_id TEXT,
event TEXT,
payload TEXT,
timestamp REAL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp DESC)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_node ON events(node_id)")
conn.commit()
def insert_event(self, node_id: str, event: str, payload: dict):
try:
conn = self._get_conn()
conn.execute(
"INSERT INTO events (node_id, event, payload, timestamp) VALUES (?,?,?,?)",
(node_id, event, json.dumps(payload), time.time())
)
conn.commit()
except Exception as e:
logger.error("PluginDB insert error: %s", e)
def get_recent_events(self, limit=100):
conn = self._get_conn()
rows = conn.execute(
"SELECT * FROM events ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
result = []
for r in rows:
d = dict(r)
try: d["payload"] = json.loads(d["payload"])
except Exception: pass
result.append(d)
return result
def prune_old(self, max_days=30):
cutoff = time.time() - (max_days * 86400)
try:
conn = self._get_conn()
conn.execute("DELETE FROM events WHERE timestamp < ?", (cutoff,))
conn.commit()
except Exception as e:
logger.error("PluginDB prune error: %s", e)
# plugins/my_plugin/main.py — using your own PluginDB
import os, sys, asyncio, threading, time
from fastapi import APIRouter, Query
PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PLUGIN_DIR) # ← allows importing db.py from plugin dir
from db import PluginDB
plugin_router = APIRouter()
_plugin_db: PluginDB = None
_logger = None
@plugin_router.get("/events")
async def get_events(limit: int = Query(50, ge=1, le=500)):
rows = await asyncio.to_thread(_plugin_db.get_recent_events, limit)
return {"events": rows}
def init_plugin(context: dict):
global _plugin_db, _logger
_logger = context["logger"]
db_path = os.path.join(PLUGIN_DIR, "data.db")
_plugin_db = PluginDB(db_path)
_logger.info("PluginDB ready at %s", db_path)
def _prune_loop():
while True:
time.sleep(86400)
_plugin_db.prune_old(max_days=30)
threading.Thread(target=_prune_loop, daemon=True).start()
sys.path.insert(0, PLUGIN_DIR) at the top of main.py allows you to split your plugin across multiple files (db.py, helpers.py, models.py, etc.) and import them with plain import db statements. Without this, Python will not find modules in your plugin directory.Plugin Config File — Persisting Settings
Store user-configurable settings (thresholds, API keys, feature flags) in a config.json in the plugin directory. Expose it via API routes so it can be updated from the UI or programmatically without restarting.
import json, os
PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_PATH = os.path.join(PLUGIN_DIR, "config.json")
DEFAULTS = {
"alert_threshold": 15,
"send_channel": 0,
"interval_seconds": 60,
"enabled": True,
}
def load_config() -> dict:
cfg = DEFAULTS.copy()
if os.path.exists(CONFIG_PATH):
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
cfg.update(json.load(f))
except Exception:
pass # fall back to defaults on parse error
return cfg
def save_config(cfg: dict):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
@plugin_router.get("/config")
async def get_config():
return await asyncio.to_thread(load_config)
@plugin_router.post("/config")
async def update_config(body: dict):
cfg = await asyncio.to_thread(load_config)
cfg.update({k: v for k, v in body.items() if k in DEFAULTS})
await asyncio.to_thread(save_config, cfg)
return {"status": "saved", "config": cfg}
MeshtasticData API — Live In-Memory Store
context["meshtastic_data"] is the live node and packet store for the primary slot. It is updated in real time as packets arrive. All attribute access is synchronous and safe from both threads and async code — you are reading Python dicts, not doing I/O.
md = context["meshtastic_data"]
# All known nodes — Dict keyed by node_id ("!aabbccdd")
# Each value contains: node_id, node_num, long_name, short_name, hw_model,
# firmware_version, role, isLocal, lastHeard (unix int), snr, rssi,
# latitude, longitude, altitude, battery_level, voltage,
# channel_utilization, air_util_tx, source, source_confidence,
# user (dict), position (dict), deviceMetrics (dict), environmentMetrics (dict)
nodes: dict = md.nodes
local_id: str = md.local_node_id # "!aabbccdd" or None until first connection
local_info: dict = md.local_node_info # rich dict: firmware, hw, channels, lora config, etc.
status: str = md.connection_status # "Initializing", "Connected", "Reconnecting", etc.
chan_map: dict = md.channel_map # channel_settings_id (int) → channel_index (int)
stats = md.get_serializable_stats() # safe dict for returning from routes
recent = md.get_formatted_packets_from_memory(limit=50) # no DB I/O, fastest
# Useful filtering patterns
gps_nodes = {nid: n for nid, n in md.nodes.items()
if n.get("latitude") is not None}
recent_nodes = {nid: n for nid, n in md.nodes.items()
if (n.get("lastHeard") or 0) > time.time() - 1800}
low_battery = [n for n in md.nodes.values()
if n.get("battery_level") is not None and n.get("battery_level") < 15]
node = md.nodes.get("!aabbccdd")
if node:
name = node.get("long_name") or node.get("short_name") or node["node_id"]
ConnectionManager — Transmitting to the Mesh
cm = context["connection_manager"]
# Always check before sending
if not cm.is_ready.is_set():
logger.warning("Radio not ready")
return
# Broadcast
await cm.sendText("Hello mesh!", destinationId="^all", channelIndex=0)
# Direct message
await cm.sendText("Hello!", destinationId="!aabbccdd", channelIndex=0)
# From a thread — fire and forget
asyncio.run_coroutine_threadsafe(
cm.sendText("Ping", destinationId="^all", channelIndex=0),
context["event_loop"]
)
# From a thread — with result confirmation
future = asyncio.run_coroutine_threadsafe(
cm.sendText("Ping", destinationId="^all", channelIndex=0),
context["event_loop"]
)
try:
future.result(timeout=10)
except Exception as e:
logger.error("Send failed: %s", e)
# Raw interface access
if cm.interface:
raw_nodes = cm.interface.nodes # keyed by node_num (int)
local_node = cm.interface.localNode
cm.interface.requestNodes() # ask mesh to re-broadcast NodeInfo
NodeSlot Registry — Multi-Radio Access
registry = context["node_registry"]
# NodeSlot fields:
# slot_id, label, meshtastic_data, db_manager, connection_manager,
# packet_queue, tasks, sse_queues, db_uuid
for slot_id, slot in registry.items():
logger.info("Slot %s: %d nodes, ready=%s",
slot_id, len(slot.meshtastic_data.nodes),
slot.connection_manager.is_ready.is_set())
# Per-slot DB query
slot = registry.get("node_1")
if slot:
msgs = await asyncio.to_thread(slot.db_manager.get_messages, limit=20)
# Send via a specific slot from a thread
if slot and slot.connection_manager.is_ready.is_set():
asyncio.run_coroutine_threadsafe(
slot.connection_manager.sendText("Via slot 1", destinationId="^all", channelIndex=0),
context["event_loop"]
)
# Aggregate all nodes from all radios
all_nodes = {}
for slot_id, slot in registry.items():
for nid, node in slot.meshtastic_data.nodes.items():
all_nodes[f"{slot_id}:{nid}"] = {**node, "heard_by_slot": slot_id}
Static Files — Complete Guide
Your static/ directory is served verbatim by Starlette's StaticFiles — no Python code runs when files are fetched. Put HTML, JS, CSS, images, fonts, or any other static assets there.
URLs and Framing
plugins/my_plugin/static/
├── index.html → /static/plugins/my_plugin/index.html (raw)
├── app.js → /static/plugins/my_plugin/app.js
├── style.css → /static/plugins/my_plugin/style.css
└── bridge.html → /static/plugins/my_plugin/bridge.html
nav_menu href "/plugin/my_plugin/index.html"
→ Loads your page INSIDE the MeshDash chrome (sidebar, header, theme)
→ Font Awesome 6 is already loaded — use <i class="fas fa-..."> freely
→ A "Pop Out" button is injected into the frame header automatically
nav_menu href "/static/plugins/my_plugin/index.html"
→ Raw standalone page — no chrome, no sidebar, no shared CSS
→ You must load Font Awesome yourself if you need icons
HTML Page — Using MeshDash Theme Variables
<!-- plugins/my_plugin/static/index.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>My Plugin</title>
<link rel="stylesheet" href="/static/plugins/my_plugin/style.css">
</head>
<body>
<div class="plugin-page">
<div class="toolbar">
<h2><i class="fas fa-puzzle-piece"></i> My Plugin</h2>
<button onclick="loadData()"><i class="fas fa-sync"></i> Refresh</button>
</div>
<div id="status" class="status-bar">Loading...</div>
<div id="content"></div>
</div>
<script src="/static/plugins/my_plugin/app.js"></script>
</body>
</html>
CSS — Theme Variables Reference
/* plugins/my_plugin/static/style.css */
/* MeshDash CSS custom properties — available when framed via /plugin/ */
/* --bg1 main page background --bg2 card/panel background */
/* --bg3 hover/input background --txt primary text */
/* --txt2 secondary/muted text --bd2 border colour */
/* --acc accent colour (blue) --sans sans-serif font stack */
/* --mono monospace font stack */
body {
font-family: var(--sans, system-ui, sans-serif);
background: var(--bg1, #1a1a1a);
color: var(--txt, #e0e0e0);
margin: 0;
padding: 0;
}
.plugin-page { padding: 20px; }
.toolbar { display: flex; align-items: center; gap: 12px;
border-bottom: 1px solid var(--bd2, #333); padding-bottom: 10px; margin-bottom: 16px; }
.toolbar h2 { margin: 0; color: var(--txt); font-family: var(--sans); }
.status-bar { font-size: 0.8rem; color: var(--txt2, #888); margin-bottom: 12px; }
.card {
background: var(--bg2, #242424);
border: 1px solid var(--bd2, #333);
border-radius: 6px;
padding: 12px 16px;
margin-bottom: 8px;
}
.card .label { font-size: 0.75rem; color: var(--txt2); text-transform: uppercase; letter-spacing: 0.05em; }
.card .value { font-family: var(--mono, monospace); font-size: 0.9rem; color: var(--txt); margin-top: 4px; }
button {
background: var(--acc, #4a9eff);
color: #fff;
border: none;
border-radius: 4px;
padding: 6px 14px;
cursor: pointer;
font-size: 0.82rem;
}
button:hover { opacity: 0.85; }
table { width: 100%; border-collapse: collapse; font-size: 0.85rem; }
th { background: var(--bg2); color: var(--txt2); font-weight: 600;
padding: 8px 10px; text-align: left; border-bottom: 1px solid var(--bd2); }
td { padding: 7px 10px; border-bottom: 1px solid var(--bd2); color: var(--txt); }
tr:hover td { background: var(--bg3, #2a2a2a); }
JavaScript — Calling Your Plugin API
// plugins/my_plugin/static/app.js
const API = "/api/plugins/my_plugin";
// ── Basic fetch helper with error handling ────────────────────────────────
async function apiFetch(path, options = {}) {
const resp = await fetch(`${API}${path}`, {
headers: { "Content-Type": "application/json" },
...options,
});
if (!resp.ok) {
const err = await resp.json().catch(() => ({}));
throw new Error(err.detail || `HTTP ${resp.status}`);
}
return resp.json();
}
// ── Load data and render ──────────────────────────────────────────────────
async function loadData() {
document.getElementById("status").textContent = "Loading...";
try {
const data = await apiFetch("/nodes");
renderNodes(data.nodes);
document.getElementById("status").textContent =
`${data.nodes.length} nodes — updated ${new Date().toLocaleTimeString()}`;
} catch (err) {
document.getElementById("status").textContent = `Error: ${err.message}`;
console.error(err);
}
}
function renderNodes(nodes) {
const html = nodes.map(n => `
<div class="card">
<div class="label">${n.id}</div>
<div class="value">${n.name || n.id}
${n.battery != null ? ` • 🔋 ${n.battery}%` : ""}
${n.snr != null ? ` • SNR ${n.snr}dB` : ""}
</div>
</div>`).join("");
document.getElementById("content").innerHTML = html || "<p>No nodes yet.</p>";
}
// ── POST example ─────────────────────────────────────────────────────────
async function sendMessage(text, destination = "^all") {
try {
const result = await apiFetch("/send", {
method: "POST",
body: JSON.stringify({ text, destination }),
});
console.log("Sent:", result);
} catch (err) {
alert(`Send failed: ${err.message}`);
}
}
// ── Polling ───────────────────────────────────────────────────────────────
let _pollTimer = null;
function startPolling(ms = 10000) {
stopPolling();
_pollTimer = setInterval(loadData, ms);
}
function stopPolling() {
if (_pollTimer) { clearInterval(_pollTimer); _pollTimer = null; }
}
// ── Auto-start ────────────────────────────────────────────────────────────
document.addEventListener("DOMContentLoaded", () => {
loadData();
startPolling(15000);
});
Bridge Page — Persistent Background JS
A bridge page is a hidden iframe that MeshDash loads ~800ms after startup. It is never shown to the user. Use it for persistent background tasks: WebSocket listeners, push notification checks, periodic sync from your plugin's backend.
<!-- plugins/my_plugin/static/bridge.html -->
<!DOCTYPE html><html><head><meta charset="utf-8"></head>
<body><script>
const API = "/api/plugins/my_plugin";
async function bgCheck() {
try {
const data = await fetch(`${API}/alerts`).then(r => r.json());
if (data.alerts && data.alerts.length) {
// Send a message to the parent MeshDash window
window.parent.postMessage({
type: "my_plugin_alert",
count: data.alerts.length,
latest: data.alerts[data.alerts.length - 1]
}, "*");
}
} catch(e) {}
}
setInterval(bgCheck, 30000);
bgCheck();
</script></body></html>
Route Patterns — FastAPI Features Available in Plugins
from fastapi import APIRouter, HTTPException, Query, Path, Body, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from typing import Optional
plugin_router = APIRouter()
# Path parameters
@plugin_router.get("/node/{node_id}")
async def get_node(node_id: str = Path(..., pattern=r"^![0-9a-f]{8}$")):
node = _md.nodes.get(node_id)
if not node:
raise HTTPException(status_code=404, detail=f"Node {node_id} not found")
return node
# Query parameters with validation
@plugin_router.get("/messages")
async def get_messages(
channel: int = Query(0),
limit: int = Query(50, ge=1, le=500),
since: Optional[float] = Query(None)
):
return {"messages": await asyncio.to_thread(
_db.get_messages, channel=channel, start=since, limit=limit
)}
# Pydantic request body
class SendRequest(BaseModel):
text: str
destination: str = "^all"
channel: int = 0
@plugin_router.post("/send")
async def send_message(req: SendRequest):
if not _cm or not _cm.is_ready.is_set():
raise HTTPException(503, "Radio not connected")
await _cm.sendText(req.text, destinationId=req.destination, channelIndex=req.channel)
return {"status": "sent"}
# Background task (returns immediately, work runs after response)
@plugin_router.post("/trigger")
async def trigger(background_tasks: BackgroundTasks):
async def _work():
await asyncio.sleep(1)
await _cm.sendText("Triggered!", destinationId="^all", channelIndex=0)
background_tasks.add_task(_work)
return {"status": "queued"}
# CSV export via streaming response
@plugin_router.get("/export.csv")
async def export_csv():
def generate():
yield "node_id,name,battery,snr,lat,lon\n"
for nid, n in _md.nodes.items():
yield f"{nid},{n.get('long_name','')},{n.get('battery_level','')},{n.get('snr','')},{n.get('latitude','')},{n.get('longitude','')}\n"
return StreamingResponse(generate(), media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=nodes.csv"})
# Raw request body (webhooks, arbitrary JSON)
@plugin_router.post("/webhook")
async def webhook(request: Request):
body = await request.json()
return {"received": True, "keys": list(body.keys())}
Plugin Lifecycle and Status Values
loadingrunningstopped.disabled file exists. Routes return 503. Persists across restarts.crashedinit_plugin. The error field in GET /api/system/plugins contains the exception. Routes return 503.hung"watchdog": true and has not heartbeated for 120 seconds. Routes return 503. Requires a server restart to recover.invalid_manifestmanifest.json failed validation (missing watchdog, invalid ID characters, empty or malformed JSON). Plugin directory is ignored entirely.pending_restartPlugin Management API
GET /api/system/pluginsGET /api/system/plugins/{id}/logst (unix float), lvl (level name), msg (formatted line).DELETE /api/system/plugins/{id}/logsPOST /api/system/plugins/{id}/toggle?action=stop.disabled. Routes return 503 immediately. Persists across restarts.POST /api/system/plugins/{id}/toggle?action=start.disabled. If crashed/hung, sets pending_restart — requires server restart to reload code.POST /api/system/plugins/install.zip containing plugin directory. Validates manifest. Requires restart to activate.POST /api/system/plugins/install-remote.zip. Same validation.DELETE /api/system/plugins/{id}GET /api/system/plugins/menuGET /api/plugins/bridges"bridge" manifest key.Disabling a Plugin
# Via filesystem
touch /opt/meshdash/plugins/my_plugin/.disabled
# Via API
POST /api/system/plugins/my_plugin/toggle?action=stop
# Re-enable
rm /opt/meshdash/plugins/my_plugin/.disabled
# or
POST /api/system/plugins/my_plugin/toggle?action=start
Porting a Standalone Python App to a Plugin
If you have an existing standalone script that talks to Meshtastic — a bot, logger, or web service — it can be wrapped into a plugin with a predictable series of changes. The core transformation is: remove your connection setup (MeshDash owns it), replace interface calls with injected context objects, and convert your HTTP server into a FastAPI APIRouter.
Step 1 — Remove Connection Setup
# ✗ BEFORE
import meshtastic.tcp_interface
from pubsub import pub
iface = meshtastic.tcp_interface.TCPInterface("192.168.1.100")
pub.subscribe(on_receive, "meshtastic.receive")
# ✓ AFTER — delete all of the above.
# MeshDash owns the connection. Read data through context["meshtastic_data"].
Step 2 — Replace sendText
# ✗ BEFORE
iface.sendText("hello", destinationId="^all", channelIndex=0)
# ✓ AFTER — from async handler
await _cm.sendText("hello", destinationId="^all", channelIndex=0)
# ✓ AFTER — from a thread
asyncio.run_coroutine_threadsafe(
_cm.sendText("hello", destinationId="^all", channelIndex=0), _loop
)
Step 3 — Replace Your HTTP Server
# ✗ BEFORE (Flask)
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/status")
def status():
return jsonify({"nodes": len(iface.nodes)})
if __name__ == "__main__":
app.run(port=5000)
# ✓ AFTER (plugin router — no app.run, no port)
from fastapi import APIRouter
plugin_router = APIRouter()
@plugin_router.get("/status")
async def status():
return {"nodes": len(_md.nodes)}
Step 4 — Replace Node Access
# ✗ BEFORE (raw interface, keyed by node_num int)
for num, node in iface.nodes.items():
print(node.get("user", {}).get("longName"))
# ✓ AFTER (in-memory store, keyed by "!hex" string)
for nid, node in _md.nodes.items():
print(node.get("long_name") or node.get("user", {}).get("longName", nid))
Step 5 — Move State into init_plugin
from fastapi import APIRouter
import threading, time, asyncio, os, sys
PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, PLUGIN_DIR)
plugin_router = APIRouter()
_cm = _md = _db = _loop = _log = None
@plugin_router.get("/data")
async def get_data():
rows = await asyncio.to_thread(_db.get_recent_packets, 50)
return {"packets": rows}
def _worker():
while True:
try:
nodes = list(_md.nodes.values()) # synchronous read — safe in thread
_log.info("%d nodes visible", len(nodes))
if _cm.is_ready.is_set():
asyncio.run_coroutine_threadsafe(
_cm.sendText("ping", destinationId="^all", channelIndex=0), _loop
)
except Exception as e:
_log.error("Worker: %s", e)
time.sleep(300)
def init_plugin(context: dict):
global _cm, _md, _db, _loop, _log
_cm = context["connection_manager"]; _md = context["meshtastic_data"]
_db = context["db_manager"]; _loop = context["event_loop"]
_log = context["logger"]
threading.Thread(target=_worker, daemon=True).start()
_log.info("Plugin ready")
Step 6 — Database: Share or Separate
Add custom tables to the shared MeshDash DB (simplest — one file, full access to existing data):
def init_plugin(context: dict):
conn = context["db_manager"]._get_connection()
conn.execute("CREATE TABLE IF NOT EXISTS my_table (id INTEGER PRIMARY KEY, ...)")
conn.commit()
Or keep a fully separate DB file in your plugin directory for full isolation (see Using Your Own Separate Database above). Pick shared when your queries need to JOIN against mesh data. Pick separate when your schema is complex and independent.
Common Porting Pitfalls
- Do not call
pub.subscribe. Subscribing again from a plugin registers a second listener and double-processes every packet. Read frommeshtastic_data.nodesor the DB instead. - Do not start threads at module scope. Module-level code executes during import under the 10-second timeout. All thread creation must happen inside
init_plugin. - Do not open your own Meshtastic interface. Two interfaces on the same serial port or TCP host will conflict and disconnect each other.
- Do not call
asyncio.run()from a thread. It creates a new event loop that conflicts with the main one. Useasyncio.run_coroutine_threadsafe(coro, event_loop). - Do not call blocking DB methods from
async def. Wrap every DB call inawait asyncio.to_thread(fn, args). - Do not use
os.getcwd()for file paths. The working directory is the MeshDash install directory, not your plugin directory. Always derive paths fromos.path.dirname(os.path.abspath(__file__)). - Do not import setup-installed packages at module scope. Those imports run before
init_plugin, before pip has installed anything. Import them insideinit_pluginafter confirming the sentinel exists. - Do not import from other plugin directories. There is no guaranteed plugin load order. Never import
plugins.other_plugin.something— plugins are isolated modules.
Full Checklist Before Shipping a Plugin
manifest.json
✓ "id" — alphanumeric/underscores/hyphens only, matches directory name exactly
✓ "watchdog" — explicitly true or false, NEVER omitted
main.py
✓ PLUGIN_DIR = os.path.dirname(os.path.abspath(__file__)) — not os.getcwd()
✓ sys.path.insert(0, PLUGIN_DIR) — if you have helper modules
✓ plugin_router = APIRouter() — if you have routes
✓ def init_plugin(context: dict): — if you need any service
✓ All context values stored in module globals — not passed to threads
✓ Setup sentinel checked before importing setup-installed packages
✓ All setup-installed packages imported inside init_plugin, not at module scope
✓ await asyncio.to_thread() for all DB calls from async handlers
✓ asyncio.run_coroutine_threadsafe() for all async calls from threads
✓ context["plugin_watchdog"][pid] = time.time() in loop — if watchdog: true
✓ threading.Thread(..., daemon=True) — all background threads
✓ context["logger"] for all log output — not logging.getLogger()
✓ cm.is_ready.is_set() checked before every sendText call
✓ All DB writes wrapped in try/except
static/
✓ All file paths in JS use /static/plugins/{id}/filename — never relative paths
✓ CSS uses var(--bg1), var(--txt) etc. for theme compatibility
✓ Font Awesome icons work without extra script tag when framed via /plugin/
✓ API calls use /api/plugins/{id}/endpoint as base URL