树莓派+攀腾PMSA003-PM2.5传感器监测空气质量【含源码】
发布日期:
这个教程的目的是使用树莓派+攀腾PMSA003-PM2.5传感器,采集PM2.5数据,并推送到ThingsPanel,实现对空气质量的监测。
所需设备
1.树莓派一个
2.传感器一个:PMSA003,要买带转接头的,如下图
3.杜邦线4根
线路连接
除了电源和地线,数据收发接入到8和10,如图所示:
需要打开树莓派串口通信
使用如下搜索词,即可搜索到各种的打开方法
enable_uart=1
采集与推送代码
创建一个代码,名为pm.py,目的是采集pm2.5与pm10的数据发送到mqtt broker
这里有几个参数需要设置,你可以改成自己的
●推送服务器 :填写自己的
●端口:1883
●发送频率:5秒
●设备ID:随意,但是不能为空
●mqtt认证用户名:
●mqtt认证密码:为空
粘贴代码准备执行
起名pm.py即可
#!/usr/bin/env python
import serial
import time
import sys
import json
import datetime
import binascii
import paho.mqtt.client as mqtt
class pmsA003():
def __init__(self, dev):
self.serial = serial.Serial(dev, baudrate=9600, timeout=3)
def __exit__(self, exc_type, exc_value, traceback):
self.serial.close()
def setIdel(self):
idelcmd = b'\x42\x4d\xe4\x00\x00\x01\x73'
ary = bytearray(idelcmd)
self.serial.write(ary)
def setNormal(self):
normalcmd = b'\x42\x4d\xe4\x00\x01\x01\x74'
ary = bytearray(normalcmd)
self.serial.write(ary)
def vertify_data(self):
if not self.data:
return False
return True
def read_data(self):
while True:
b = self.serial.read(1)
if b == b'\x42':
data = self.serial.read(31)
if data[0] == 0x4d:
self.data = bytearray(b'\x42' + data)
if self.vertify_data():
return self._PMdata()
def _PMdata(self):
d = {}
d['pm25'] = str(self.data[6] * 256 + self.data[7])
d['pm10'] = str(self.data[4] * 256 + self.data[5])
return d
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def on_disconnect(client, userdata, rc):
print("Disconnected with result code "+str(rc))
if __name__ == '__main__':
client = mqtt.Client()
client.username_pw_set(username="42b09094-4f2a-ab85-5e6c-a18e8eb6947a", password="")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect("dev.thingspanel.cn", 1883, 60)
con = pmsA003('/dev/ttyAMA0')
while True:
d = con.read_data()
payload = json.dumps({
"pm25": d["pm25"],
"pm10": d["pm10"]
})
client.publish("device/attributes", payload)
time.sleep(5)
另外一个可选的代码,同时可推送到2个服务器
import time
import serial
import json
import paho.mqtt.client as mqtt
from adafruit_pm25.uart import PM25_UART
from digitalio import DigitalInOut, Direction, Pull
DEVICE_ID = "123456" # Replace with your actual device ID
# MQTT setup for first broker
mqttc1 = mqtt.Client(client_id=DEVICE_ID) # Added device ID as client ID
mqttc1.username_pw_set("864851d7-23b6-bddc-98b0-470be6cfd3fc", password="") # Set username and no password
mqttc1.connect("dev.thingspanel.cn", 1883, 60) # Connect to the first MQTT broker
mqttc1.loop_start()
# MQTT setup for second broker
mqttc2 = mqtt.Client()
mqttc2.connect("127.0.0.1", 1883, 60) # Connect to the second MQTT broker
mqttc2.loop_start()
reset_pin = None
uart = serial.Serial("/dev/ttyAMA0", baudrate=9600, timeout=0.25)
pm25 = PM25_UART(uart, reset_pin)
while True:
time.sleep(1)
try:
aqdata = pm25.read()
except RuntimeError:
print("Unable to read from sensor, retrying...")
continue
# Create a JSON object with specified format
data_json = json.dumps({
"pm25": aqdata["pm25 env"],
"pm10": aqdata["pm10 env"]
})
# Publish the data to a specific topic for first broker
mqttc1.publish("device/attributes", payload=data_json, qos=0, retain=False)
# Publish the data to a specific topic for second broker
mqttc2.publish("pm25", payload=data_json, qos=0, retain=False)
配置后台自动运行
crontab -e
加入代码到最后一行,意思是重启后自动启动,确保任何时候都正常工作
@reboot /usr/bin/python /你的地址/pm.py
重启树莓派即可开始正常工作,即可在平台上看到数据。