Python+MQTT+MySQL实现获取边缘设备上行数据

248
0
2021年2月15日 09时00分

步骤

 


这个是我用来看EMQ服务器里面数据的工具,MQTTX,挺好用的哦

 

1

 

首先,我们需要这些库,下载起来也很简单,直接百度安装即可,安装好之后import进来

 

23

 

以主题“weight_pub”为例
首先是进行获取MQTT的数据,函数API使用实例如下:

 

4

 

##MQTT操作
def MQTTWeight():
    # config._init()                  
    client_weight = mqtt.Client()
    client_weight.on_connect = on_connect_weight
    client_weight.on_message = on_message_weight
    client_weight.connect('**.**.***.***',****, 600) # 600为keepalive的时间间隔
    client_weight.subscribe('weight_pub', qos=0)
    client_weight.loop_forever() # 保持连接

 

这个函数表示正常连接到了EMQ服务器的weight_pub主题,并且会进行回显告诉用户成功连上服务器

 

5

 

def on_connect_weight(client, userdata, flags, rc):
    print("Connected with weight result code: " + str(rc))

 

接下来在on_message_weight(client, userdata, msg) 函数中处理我们获得到的数据

 

6

 

def on_message_weight(client, userdata, msg):
    var = str(msg.payload)
    if(get_value('WriteEnable') == 1):
        set_value('weight',(GetWeight(var)))
        if(get_value('weight') > 10):
            total_data['weight'] -= 2.75
        # print ( 'wei :%.2f' % total_data['weight'] ,end=':')   

 

至此数据获取完成,接下来进行数据库记录修改操作,我已经在数据库里面建好记录了,所以只要修改对应记录里面的数据即可,python通过id寻找相应的记录。

 

7

 

##数据库操作
def updateHandPart():
    if(get_value('heartRate') > 0):
        conn = get_conn()
        cur = conn.cursor()
        sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';
        args = (get_value('heartRate'), total_data['id'])
        result = cur.execute(sql,args)
        total_data['heartRate'] = 0
        # print(result,end=';')
        sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';
        args = (get_value('diastolicPressure'), total_data['id'])
        result = cur.execute(sql,args)
        total_data['diastolicPressure'] = 0
        # print(result,end=';')
        sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';
        args = (get_value('systolicPressure'), total_data['id'])
        total_data['systolicPressure'] = 0
        result = cur.execute(sql,args)
        # print(result)
        conn.commit()
        cur.close()
        conn.close()

 

这个函数是为了连接上数据库

 

8

 

def get_conn():
    conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')
    return conn

 

最后在我们的main函数里面开线程获取数据之后处理数据

 

9

 


#*****代表用户自己的数据

total_data = {'WriteEnable':0, 'weight': 0.00, 'diastolicPressure': 0,
                 'heartRate': 0, 'systolicPressure':0, 
                'height':0.00, 'bloodOxygen':0.00, 'BMI':0.00, 
                'idealWeight':0.00, 'temperature':0.00,'id':1000}


def set_value(name, value):
    total_data[name] = value

 
def get_value(name, defValue=None):
    try:
        return total_data[name]
    except KeyError:
        return defValue

#获取体重
def GetWeight(str):
    var = ''
    cnt = 0
    for char in str:
        cnt = cnt + 1
        if char >= '0' and char <= '9' or char == '.':
            var += char
    return float(var)

def on_connect_weight(client, userdata, flags, rc):
    print("Connected with weight result code: " + str(rc))

def on_message_weight(client, userdata, msg):
    var = str(msg.payload)
    if(get_value('WriteEnable') == 1):
        set_value('weight',(GetWeight(var)))
        if(get_value('weight') > 10):
            total_data['weight'] -= 2.75
        # print ( 'wei :%.2f' % total_data['weight'] ,end=':')   
##MQTT操作
def MQTTWeight():
    # config._init()                  
    client_weight = mqtt.Client()
    client_weight.on_connect = on_connect_weight
    client_weight.on_message = on_message_weight
    client_weight.connect('**.**.***.***',****, 600) # 600为keepalive的时间间隔
    client_weight.subscribe('weight_pub', qos=0)
    client_weight.loop_forever() # 保持连接


def get_conn():
    conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')
    return conn

##数据库操作
def updateHandPart():
    if(get_value('heartRate') > 0):
        conn = get_conn()
        cur = conn.cursor()
        sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';
        args = (get_value('heartRate'), total_data['id'])
        result = cur.execute(sql,args)
        total_data['heartRate'] = 0
        # print(result,end=';')
        sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';
        args = (get_value('diastolicPressure'), total_data['id'])
        result = cur.execute(sql,args)
        total_data['diastolicPressure'] = 0
        # print(result,end=';')
        sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';
        args = (get_value('systolicPressure'), total_data['id'])
        total_data['systolicPressure'] = 0
        result = cur.execute(sql,args)
        # print(result)
        conn.commit()
        cur.close()
        conn.close()

if __name__ == '__main__':
    try:
        thread_weight = threading.Thread(target=MQTTWeight)
        thread_weight.setDaemon(True)
        thread_weight.start()

        thread_height = threading.Thread(target=MQTTHeight)
        thread_height.setDaemon(True)
        thread_height.start()

        thread_blood = threading.Thread(target=MQTTBoold)
        thread_blood.setDaemon(True)
        thread_blood.start()

        var = 1
        while(var == 1): 
            time.sleep(1)#延时1s查看数据
            if(get_value('WriteEnable') == 1):
                update()    
                total_data['WriteEnable'] = 0
            else:
                updateHandPart()
                          

    except KeyboardInterrupt:
        stop_thread(thread_weight)
        stop_thread(thread_height)
        stop_thread(thread_blood)
        print('end')     

(づ ̄3 ̄)づ╭❤~一键三连,这次一定(๑•̀ㅂ•́)و✧

QQ图片20210126200222

发表评论

后才能评论