Objetivos
Material requerido.
Una servidor MQTT |
Reemplazando Telegraf por Python
En el modelo de nuestro servidor IOT, que hemos ido desarrollando en las últimas sesiones usábamos Telegraf para suscribir los tópicos de nuestro interés e insertarlos en Influx DB sobre la marcha. Telegraf es una utilidad magnifica y sin duda tendería a usarla cuando el número de medidas o puntos a insertar fuera elevado, pero tiene un pequeño inconveniente; No podemos ver si nuestros sensores están en marcha o se han parado en algún momento.
Vemos, Telegraf está muy bien, pero cuando encendemos nuestra Raspberry para que todo se inicie según lo previsto, no tenemos forma de comprobar que todo fluye correctamente, más que usando Grafana y viendo que las medidas entran. Y cuando fallan… ¿Como saber lo que falla? ¿Será un problema del MQTT? ¿O quizás de Telegraf… o de Influx?
No cabe duda que todo esto es un tanto insatisfactorio (Cuando hay problemas) por lo que si yo quiero comprobar que el proceso va fino voy a necesitar algo que me de la tranquilidad de ver como son las cosas (Uno es perro viejo ya) y para eso, nada como usar un script Python que me deje fisgar entre los procesos y ver qué pasa.
Por eso en esta ocasión vamos a escribir ese script que como veremos enseguida es bastante fácil con mi lenguaje favorito: Python.
Suscribiendo tópicos en Python 3
Como era de esperar alguien se ha molestado en escribir una librería Python para conectar con el servidor MQTT y es de lo más fácil de usar. Se llama paho-mqtt y lo puedes instalar en tu Raspberry con el comando:
pip install paho-mqtt
Con la librería instalada podemos iniciar nuestro programa Python importándola:
import paho.mqtt.client as mqtt
Antes de empezar con el código principal del programa que nos suscriba al Boker MQTT, tenemos que definir un par de funciones que nos informen de que ha conectado correctamente, y otra que recoja el mensaje que nos envía el Mosquitto (O similar) cuando nos envié un tópico al que nos hayamos suscrito.
La función que nos informe de la conexión con el Broker seria:
def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("Prosensor/+")
Como estoy usando el sensor SCD41 para publicar valores en el MQTT, hay tres tópicos que me interesan: CO2, Temperatura y Humedad, y en lugar de suscribirlos uno por uno, he optado por usar el comodín “+”, para indicar que me interesan todos los tópicos que se publique baja el paraguas de “Prosensor”, y de paso, si en el futuro añado más sensores como Volatile Organic Compounds o VOCs, no tendré que tocar el programa Python para verlos.
La segunda función Callback es la que recibe directamente los mensajes que nos envían el Broker MQTT:
def on_message(client, userdata, msg): print(msg.payload.decode())
Que no puede ser más sencilla y lo único que hace (Por ahora) es imprimir los mensajes entrantes. Y esto nos permite empezar directamente con el programa principal, y en particular definir la conexión al broker MQTT:
client = mqtt.Client() client.username_pw_set("charly", "contrase") #set username and password client.on_message= on_message client.connect("localhost",1883,60)
La primera línea crea una instancia del cliente MQTT, y en la segunda, definimos el usuario a usar y en la tercera línea conectamos la función Callback que hemos definido arriba con el evento de entrada de mensajes (Es una forma complicada de decir que cuando hay un evento de mensaje de entrada se llame a la función on_message que hemos definido mas arriba)
Ahora ya podemos conectar con el servidor MQTT mediante la instrucción:
client.connect("localhost",1883,60)
Solo nos queda conectar la segunda función Callback al evento de conexión:
client.on_connect = on_connect
y llamar al bucle de leer el MQTT (Imprescindible. ¡No olvidar!):
client.loop_forever()
Y eso es todo. El programa completo es simplemente: Mqtt
import paho.mqtt.client as mqtt def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("Prosensor/+") def on_message(client, userdata, msg): print(msg.payload.decode()) client = mqtt.Client() client.username_pw_set("charly", "contrase") #set username and password client.on_message= on_message client.connect("localhost",1883,60) client.on_connect = on_connect client.on_message = on_message #while True: client.loop_forever()
Si ejecutamos el script desde una ventana de comandos, veremos algo como esto:
Vale, ha sido hasta demasiado fácil. Y esto nos lleva a la siguiente pregunta: ¿Podríamos hacer las veces del Telegraf para insertar los valores que leamos desde Python? Naturalmente, la respuesta es que si (Faltaría más) y no es mucho más complicado.
Accediendo a Influx DB desde Python
Para acceder a Influx DB desde Python 3, tenemos que instalar la librería correspondiente, llamada Influxdb con el sorprendente comando de consola:
pip install Influxdb
Ahora ya podemos empezar con el script, empezando por hacer import de la librería:
from influxdb import InfluxDBClient
Tendremos que conectar a Influx creando una instancia de cliente:
Iclient = InfluxDBClient('192.168.1.52','8086','charly','contrase','test')
Donde puedes ver la dirección IP del servidor Influx, el puerto de escucha, 8086, y mi usuario y contraseña para acceder a la BBDD test (En último lugar).
Los lectores menos avezados podrían pensar que la parte difícil será conseguir que la función on_message, que recibe el mensaje del broker MQTT, lo inserte en Influx, pero no. En realidad, es más bien poca cosa. Vamos a reescribir esta función así:
def on_message(client, userdata, msg): payload = msg.payload.decode() print(payload) Iclient.write_points(payload, protocol='line')
Como los mensajes que publicamos en los tópicos corresponden a instrucciones validas para Influx DB, el único cambio que tenemos que hacer es mandar el payload a un write del cliente Influx, informándole de que el protocolo es la sintaxis habitual que ya vimos de Influx que llama ‘line’, en contraposición a otras posibilidades que podrían ser JSON, por ejemplo.
Así que el programa final, es decepcionantemente sencillo:
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
def on_connect(client, userdata, flags, rc):
print(«Connected with result code «+str(rc))
client.subscribe(«Prosensor/+»)
def on_message(client, userdata, msg):
payload = msg.payload.decode()
print(payload)
Iclient.write_points(payload, protocol=’line’)
client = mqtt.Client()
client.username_pw_set(«charly», «contrase») #set username and password
client.on_message= on_message
client.connect(«localhost»,1883,60)
client.on_connect = on_connect
client.on_message = on_message
Iclient = InfluxDBClient(‘192.168.1.52′,’8086′,’charly’,’contrase’,’test’)
#while True:
client.loop_forever()
Para probar nuestro pequeño programa Python, tienes que parar el Telegraf. De este modo ya solo Python podrá insertar puntos en Influx BD y podremos comprobar que funciona:
Systemctl stop Influx
Y ahora ya puedes ejecutar el script que os dejo aquí debajo:
Como las gráficas de Grafana salían un poco sosas le he dado una buena soplada al sensor para menear un poco el cotarro y el resultado está a la vista.