树莓派+攀腾PMSA003-PM2.5传感器监测空气质量【含源码】

发布日期:

这个教程的目的是使用树莓派+攀腾PMSA003-PM2.5传感器,采集PM2.5数据,并推送到ThingsPanel,实现对空气质量的监测。

所需设备

1.树莓派一个
2.传感器一个:PMSA003,要买带转接头的,如下图
3.杜邦线4根


线路连接

除了电源和地线,数据收发接入到8和10,如图所示:





需要打开树莓派串口通信

使用如下搜索词,即可搜索到各种的打开方法

enable_uart=1

采集与推送代码

创建一个代码,名为pm.py,目的是采集pm2.5与pm10的数据发送到mqtt broker

这里有几个参数需要设置,你可以改成自己的

●推送服务器 :填写自己的

●端口:1883

●发送频率:5秒

●设备ID:随意,但是不能为空

●mqtt认证用户名:

●mqtt认证密码:为空

粘贴代码准备执行

起名pm.py即可

#!/usr/bin/env python
import serial
import time
import sys
import json
import datetime
import binascii
import paho.mqtt.client as mqtt

class pmsA003():
    def __init__(self, dev):
        self.serial = serial.Serial(dev, baudrate=9600, timeout=3)

    def __exit__(self, exc_type, exc_value, traceback):
        self.serial.close()

    def setIdel(self):
        idelcmd = b'\x42\x4d\xe4\x00\x00\x01\x73'
        ary = bytearray(idelcmd)
        self.serial.write(ary)

    def setNormal(self):
        normalcmd = b'\x42\x4d\xe4\x00\x01\x01\x74'
        ary = bytearray(normalcmd)
        self.serial.write(ary)

    def vertify_data(self):
        if not self.data:
            return False
        return True

    def read_data(self):
        while True:
            b = self.serial.read(1)
            if b == b'\x42':
                data = self.serial.read(31)
                if data[0] == 0x4d:
                    self.data = bytearray(b'\x42' + data)
                    if self.vertify_data():
                        return self._PMdata()

    def _PMdata(self):
        d = {}
        d['pm25'] = str(self.data[6] * 256 + self.data[7])
        d['pm10'] = str(self.data[4] * 256 + self.data[5])
        return d

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

def on_disconnect(client, userdata, rc):
    print("Disconnected with result code "+str(rc))

if __name__ == '__main__':
    client = mqtt.Client()
    client.username_pw_set(username="42b09094-4f2a-ab85-5e6c-a18e8eb6947a", password="")
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.connect("dev.thingspanel.cn", 1883, 60)

    con = pmsA003('/dev/ttyAMA0')
    while True:
        d = con.read_data()
        payload = json.dumps({
            "pm25": d["pm25"],
            "pm10": d["pm10"]
        })
        client.publish("device/attributes", payload)
        time.sleep(5)



另外一个可选的代码,同时可推送到2个服务器

import time
import serial
import json
import paho.mqtt.client as mqtt
from adafruit_pm25.uart import PM25_UART
from digitalio import DigitalInOut, Direction, Pull

DEVICE_ID = "123456"  # Replace with your actual device ID

# MQTT setup for first broker
mqttc1 = mqtt.Client(client_id=DEVICE_ID)  # Added device ID as client ID
mqttc1.username_pw_set("864851d7-23b6-bddc-98b0-470be6cfd3fc", password="") # Set username and no password
mqttc1.connect("dev.thingspanel.cn", 1883, 60) # Connect to the first MQTT broker
mqttc1.loop_start()

# MQTT setup for second broker
mqttc2 = mqtt.Client()
mqttc2.connect("127.0.0.1", 1883, 60) # Connect to the second MQTT broker
mqttc2.loop_start()

reset_pin = None
uart = serial.Serial("/dev/ttyAMA0", baudrate=9600, timeout=0.25)
pm25 = PM25_UART(uart, reset_pin)

while True:
    time.sleep(1)

    try:
        aqdata = pm25.read()
    except RuntimeError:
        print("Unable to read from sensor, retrying...")
        continue

    # Create a JSON object with specified format
    data_json = json.dumps({
        "pm25": aqdata["pm25 env"],
        "pm10": aqdata["pm10 env"]
    })
    
    # Publish the data to a specific topic for first broker
    mqttc1.publish("device/attributes", payload=data_json, qos=0, retain=False)

    # Publish the data to a specific topic for second broker
    mqttc2.publish("pm25", payload=data_json, qos=0, retain=False)

配置后台自动运行

crontab -e

加入代码到最后一行,意思是重启后自动启动,确保任何时候都正常工作

@reboot /usr/bin/python /你的地址/pm.py

重启树莓派即可开始正常工作,即可在平台上看到数据。


Github
Gitee
微信交流群
QQ交流群
商务咨询
北京极益科技有限公司 版权所有 ICP:京ICP备15045763号-12