smontoya smontoya - 1 month ago 21
Python Question

MQTT Paho Python Client subscriber, how subscribe forever?

Try to simple subscribe without disconnect to a Mosquitto broker to get all messages from devices that are publishing data in a specific topic, save them in a BD and Post them to a php that do "staff".

Here is my subscribe.py

import paho.mqtt.client as mqtt
from mqtt_myapp import *

topic="topic/#" # MQTT broker topic
myclient="my-paho-client" # MQTT broker My Client
user="user" # MQTT broker user
pw="pass" # MQTT broker password
host="localhost" # MQTT broker host
port=1883 # MQTT broker port
value="123" # somethin i need for myapp

def on_connect(mqttc, userdata, rc):
print('connected...rc=' + str(rc))
mqttc.subscribe(topic, qos=0)

def on_disconnect(mqttc, userdata, rc):
print('disconnected...rc=' + str(rc))

def on_message(mqttc, userdata, msg):
print('message received...')
print('topic: ' + msg.topic + ', qos: ' +
str(msg.qos) + ', message: ' + str(msg.payload))
save_to_db(msg)
post_data(msg.payload,value)

def on_subscribe(mqttc, userdata, mid, granted_qos):
print('subscribed (qos=' + str(granted_qos) + ')')

def on_unsubscribe(mqttc, userdata, mid, granted_qos):
print('unsubscribed (qos=' + str(granted_qos) + ')')

mqttc = mqtt.Client(myclient)
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe
mqttc.on_unsubscribe = on_unsubscribe
mqttc.username_pw_set(user,pw)
mqttc.connect(host, port, 60)
mqttc.loop_forever()


Here is my mqtt_myapp.py:

import MySQLdb
import requests # pip install requests

url = "mydomain/data_from_broker.php"

def save_to_db(msg):
with db:
cursor = db.cursor()
try:
cursor.execute("INSERT INTO MQTT_LOGS (topic, payload) VALUES (%s,%s)", (msg.topic, msg.payload))
except (MySQLdb.Error, MySQLdb.Warning) as e:
print('excepttion BD ' + e)
return None

def post_data(payload,value):
datos = {'VALUE': value,'data-from-broker': payload}
r = requests.post(url, datos)
r.status_code
print('response POST' + str(r.status_code))

db = MySQLdb.connect("localhost","user_db","pass_db","db" )


When I run my python script on background with
python -t mqtt_subscribe.py &
I get messages publish for other clients, but after some hours of my subscribe.py script running, socket error happens.

Mosquito.log:

...
1475614815: Received PINGREQ from my-paho-client
1475614815: Sending PINGRESP to my-paho-client
1475614872: New connection from xxx.xxx.xxx.xxx on port 1883.
1475614872: Client device1 disconnected.
1475614872: New client connected from xxx.xxx.xxx.xxx as device1(c0, k0, u'user1').
1475614872: Sending CONNACK to device1(0, 0)
1475614873: Received PUBLISH from device1(d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475614873: Sending PUBACK to device1 (Mid: 1)
1475614873: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475614874: Received DISCONNECT from device1
1475614874: Client device1 disconnected.
...
1475625566: Received PINGREQ from my-paho-client
1475625566: Sending PINGRESP to my-paho-client
1475625626: Received PINGREQ from my-paho-client
1475625626: Sending PINGRESP to my-paho-client
1475625675: New connection from xxx.xxx.xxx.xxx on port 1883.
1475625675: Client device1 disconnected.
1475625675: New client connected from xxx.xxx.xxx.xxx as device1 (c0, k0, u'user1').
1475625675: Sending CONNACK to device1 (0, 0)
1475625677: Received PUBLISH from device1 (d0, q1, r0, m1, 'topic/data', ... (33 bytes))
1475625677: Sending PUBACK to device1 (Mid: 1)
1475625677: Sending PUBLISH to my-paho-client (d0, q0, r0, m0, 'topic/data', ... (33 bytes))
1475625677: Socket error on client my-paho-client, disconnecting.
1475625677: Received DISCONNECT from device1
...


What could be the problem? Any idea or suggestion?

Thanks in advance

Answer

If your code in the method "on_message" throws an exception and you do not catch it, you will be disconnected. Try uncommenting all statements except the print statements. Probably one of the following statements is throwing an exception.

 save_to_db(msg)
 post_data(msg.payload,value)
Comments