物聯網數據(ju)(ju)采集涉及到大(da)量設備接(jie)入、海量的時序數據(ju)(ju)傳輸,EMQ X 消(xiao)息(xi)中間(jian)件與 TDengine 大(da)數據(ju)(ju)平臺的組合技術棧完全能夠勝(sheng)任場(chang)景(jing)中的海量時間(jian)序列監(jian)測數據(ju)(ju)的傳輸、存儲和(he)計(ji)算。
數(shu)據(ju)(ju)入庫后(hou),往(wang)往(wang)需(xu)要其他(ta)方式如數(shu)據(ju)(ju)可視化系統(tong)(tong)將數(shu)據(ju)(ju)按照規則統(tong)(tong)計、展現出來,實現數(shu)據(ju)(ju)的(de)監控、指標統(tong)(tong)計等業務需(xu)求,以便充分發揮(hui)數(shu)據(ju)(ju)的(de)價值,TDengine 搭配(pei)開源軟件 Grafana 可以快速(su)搭建(jian)物聯網數(shu)據(ju)(ju)可視化平臺。
上述整(zheng)套方案無需代碼(ma)開發(fa),涉及的產品均能(neng)(neng)提供開源軟件、企業(ye)服務、云(yun)端 SaaS 服務不同層次的交付模(mo)式,能(neng)(neng)夠根據項(xiang)目需求實現免費版或(huo)企業(ye)版私有化落地以及云(yun)端部署。

方案介紹
EMQ X 簡介
是基于高并發的 Erlang/OTP 語言平臺開發,支持百萬級連接和分布式集群架構,發布訂閱模式的開源 MQTT 消息服務器。EMQ X 內置了大量開箱即用的功能,其開源版 EMQ X Broker 及企業版 EMQ X Enterprise 均(jun)支持通過規(gui)則引(yin)擎(qing)將設備消(xiao)息存(cun)儲到 TDengine。
TDengine 是什么
TDengine 是濤思數據專為物聯網、車聯網、工業互聯網、IT 運維等設計和優化的大數據平臺。除核心的快 10 倍以上的時序數據庫(Time-Series Database)功能(neng)外,還提供緩存(cun)、數據訂閱、流(liu)式計算等功能(neng),最大程(cheng)度減少研發和運維的復雜度,且核心代(dai)碼,包(bao)括集群功能(neng)全部開源。
TDengine 提供社區版、企業版和云服務版,安裝/使用教程詳見 TDengine 使用文檔 。
Grafana 簡介
Grafana 是一個跨平臺、開源的度量分析和可視化工具,可以查詢處理各類數據源中的數據,進行可視化的展示。它可以快速靈活創建的客戶端圖表,面板插件有許多不同方式的可視化指標和日志,官方庫中具有豐富的儀表盤插件,比如熱圖、折線圖、圖表等多種展示方式;支持 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch和 KairosDB 等數(shu)據源(yuan),支(zhi)持數(shu)據項獨立/混合查詢展示;可以(yi)創建自定(ding)義告警(jing)規(gui)則并(bing)通知到(dao)其他消息處理服務或(huo)組件(jian)中。
業務場景
本文(wen)模擬物(wu)聯網環(huan)境數(shu)據(ju)采集場景,假設現有一定數(shu)據(ju)的環(huan)境數(shu)據(ju)采集點(dian),所有采集點(dian)數(shu)據(ju)均通過 MQTT 協(xie)議傳輸至(zhi)采集平臺(MQTT Publish),主題設計(ji)如(ru)下:
sensor/data
傳(chuan)感器(qi)發送(song)的數據格式為 JSON,數據包括傳(chuan)感器(qi)采集的溫度(du)、濕度(du)、噪聲(sheng)音(yin)量、PM10、PM2.5、二氧化(hua)硫(liu)、二氧化(hua)氮、一氧化(hua)碳、傳(chuan)感器(qi) ID、區(qu)域、采集時間等數據。
{
"temperature": 30,
"humidity" : 20,
"volume": 44.5,
"PM10": 23,
"pm25": 61,
"SO2": 14,
"NO2": 4,
"CO": 5,
"id": "10-c6-1f-1a-1f-47",
"area": 1,
"ts": 1596157444170
}
現在需要實時(shi)存儲(chu)以便在后續任意時(shi)間查看數據(ju),提出(chu)以下的需求:
- 每個設備按照每 5 秒鐘一次的頻率進行數據上報,數據庫需存儲每條數據以供后續回溯分析;
- 通過可視化系統查看任意區域、任意時間區間內的指標數據,如平均值、最大值、最小值。
環境準備
本文所用各(ge)個組件均(jun)有 Docker 鏡像,除 EMQ X 需(xu)要修改少數配(pei)置(zhi)為了便(bian)于操作(zuo)使用下載安裝外,TDengine 與 Grafana 均(jun)使用 Docker 搭建(jian)。
安裝包資源與使(shi)用教程(cheng)參照各(ge)自官網:
- EMQ X:
- TDengine:濤思數據官網
- Grafana:
安裝 EMQ X
如(ru)果您是 EMQ X 新手用戶,推薦通過 快(kuai)速(su)上手
訪問 下載適合您(nin)操作(zuo)系統的(de)安裝包,本文截稿時(shi) EMQ X 開源(yuan)版最新版本為(wei) v4.1.2,下載 zip 包的(de)啟(qi)動步(bu)驟如下 :
## 解壓下載好的安裝包
unzip emqx-macosx-v4.1.1.zip
cd emqx
## 以 console 模式啟動 EMQ X 方便調試
./bin/emqx console
啟動成功后瀏覽器訪問 //127.0.0.1:18083 訪問 EMQ X 管理控制臺 Dashboard,使用默認用戶名 admin和默認密碼public完成初次登錄。
安裝 TDengine
為了方便測(ce)試使用(yong)通過 Docker 進行安裝(需映射(she)網絡端口),也可以(yi)使用(yong)安裝包的方式(shi)進行安裝:
## 拉取并啟動容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest
## 啟動后檢查容器運行狀態
docker ps -a
Grafana 安裝
使用(yong)以下命令通(tong)過 Docker 安裝并啟動 Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
啟動成功后瀏覽器訪問 //127.0.0.1:3000 訪問 Grafana 可視化面板,使用默認用戶名 admin 密碼 admin 完成(cheng)初次登錄,登錄后按照提示修改密碼使(shi)用新密碼登錄進(jin)入主界面。
配置 EMQ X 存儲數據到 TDengine
TDengine 創建數據庫與數據表
進入TDengine Docker 容器(qi):
docker exec -it tdengine bash
創建 “test” 數(shu)據庫:
taos
create database test;
創建 sensor_data 表,關于 TDengine 數據結構以及 SQL 命令參見 TAOS SQL:
use test;
CREATE TABLE sensor_data (
ts timestamp,
temperature float,
humidity float,
volume float,
PM10 float,
pm25 float,
SO2 float,
NO2 float,
CO float,
sensor_id NCHAR(255),
area TINYINT,
coll_time timestamp
);
配置 EMQ X 規則引擎
打開 EMQ X Dashboared,進入 規則引擎 -> 規則 頁面,點擊 創建 按(an)鈕進入創建頁面。
規則 SQL
規則 SQL 用于 EMQ X 消息以及事件篩選,以下 SQL 表示從 sensor/data 主(zhu)題(ti)篩選出 payload 數(shu)據:
SELECT
payload
FROM
"sensor/data"
使用SQL 測試功能,輸入測試(shi)數(shu)據(ju)進行(xing)篩選結果測試(shi),測試(shi)有結果且輸出內(nei)容如(ru)下,標明 SQL 編寫正確:
{
"payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"
}

響應動作
為支持各種不同類型平臺的開發,TDengine 提供符合 REST 設計標準的 API。通過 RESTful Connector 提供了(le)最(zui)簡單的連(lian)接方式(shi),即(ji)使用 HTTP 請求攜帶(dai)認證信息與要執行的 SQL 操作 TDengine。
使用 EMQ X 開源版中的發送到 Web 服務即可通過 RESTful Connector 寫入數據到 TDengine。即將到來的 EMQ X 企業版 4.1.1 版本將提供原(yuan)生更高(gao)性能的寫入(ru) Connector。
發(fa)送(song)到 Web 服務需要兩個數(shu)據,一個是關聯資(zi)源,另一個是消息(xi)內容模板。
- 關聯資源:HTTP 服務器配置信息,此處為 TDengine 的 RESTful Connector
- 消息內容模板:此處為攜帶數據的 INSERT SQL,注意我們應當在 SQL 中指定數據庫名,字符類型也要用單引號括起來, 消息內容模板為:
INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)

創建過程
點擊響應動作下的添加按鈕,在彈出框內選擇發送數據到 Web 服務,點擊 新建資源 新建(jian)一(yi)個 WebHook 資(zi)源(yuan)。

資源類型選擇 Webhook,請求 URL 填寫 //127.0.0.1:6041/rest/sql,請求方法選擇 POST,還需添加 Authorization 請求頭作為認證信息。
Authorization 的值為 Basic + TDengine 的 {username}:{password} 經過 Base64 編碼之后的字符串, 例如 root:taosdata 編碼后為 cm9vdDp0YW9zZGF0YQ==,實際填入的值為:Basic cm9vdDp0YW9zZGF0YQ==
在(zai)響應動作創建(jian)頁(ye)面選(xuan)擇(ze)新建(jian)的資(zi)源,并填入消息模板內容即可。

生成模擬數據
以下腳本模(mo)擬(ni)了 10000 個設備在過去 24 小時(shi)內、每隔 5 秒(miao)鐘上報一條模(mo)擬(ni)數據并發(fa)送到 EMQ X 的場(chang)景。
- 總數據量:24 * 3600 / 5 * 10000 = 1.72 億條
- 消息 TPS:2000
讀者(zhe)安裝 Node.js ,按需修改配置(zhi)參數后可(ke)以通過以下命令啟動:
npm install mqtt mockjs --save --registry=//registry.npm.taobao.org
node mock.js
附:模擬生成數據并發(fa)送(song)到 EMQ X 代碼,請根(gen)據集群性(xing)能調整相(xiang)關(guan)參數
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10000
const STEP = 5000 // 模擬采集時間間隔 ms
const AWAIT = 5000 // 每次發送完后休眠時間,防止消息速率過快 ms
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {
setTimeout(resolve, timer)
})
}
async function startMock() {
const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {
const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {
for (const client of CLIENT_POOL) {
const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {
return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {
clientId,
})
client.on('connect', () => {
console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {
console.log('reconnect')
})
client.on('error', (e) => {
console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {
"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}
可視化配置
組件安裝完成,模擬數(shu)據寫入成功后,按照 Grafana 可視化(hua)界(jie)面的操作指引(yin),完成業務所需數(shu)據可視化(hua)配置。
添加數據源(Add data source)
添加數據源,即顯示的數據源信息。選取 TDengine 類型數(shu)據(ju)源,輸入連(lian)接參數(shu)進行配(pei)置,默認情況下,關鍵配(pei)置信息(xi)如(ru)下:

添加儀表盤(New Dashboard)
添加好數據源后,添加需要顯示的數據儀表盤信息。儀表盤為多個可視化面板的集合,點擊 New Dashboard 后,選擇 + Query 通過查詢來添加數(shu)據面板。
創建面板需要四個步驟,分別是 Queries(查詢)、Visualization(可視化)、General(圖表配置)、Alert(告警),創建時間
平均值面板
使用 Grafana 的可(ke)視化查詢構建(jian)工具,查詢出(chu)所有(you)設(she)備的平均值。
以下 SQL 按照指定(ding)時間段(duan)($form $to)、指定(ding)時間間隔(ge)($interval),查詢出數據中關鍵指標(biao)的(de)平均值:
select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
Visualization 默認不做更改,General 里面修改面板名稱為 歷史平均值,如果需要對業務進行監控告警,可以在 Alert 里編排告警規則,此(ci)處僅(jin)做可視化展示,不(bu)使用此(ci)功能。

完成創建后,點擊左上角返回按鈕,該 Dashboard 里成功添加一個數據面板。點擊頂部導航欄保存圖標,輸入(ru) Dashboard 名稱完成(cheng) Dashboard 的創建(jian)。
最大值、最小值面板
繼續點擊 Dashboard 的 Add panel 按鈕,添加最大值、最小值圖表。操作步驟同添加平均值,僅對查詢中 SELECT 統計方法字段做出調整,調整為 AVG 函數為 MAX 與 MIN:
select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)

儀表盤效果
保存儀(yi)(yi)表(biao)盤(pan)(pan)(pan),拖拽調(diao)整每個(ge)數據(ju)面板大小、位置,最(zui)終得到一個(ge)視覺效(xiao)果較好(hao)的(de)(de)數據(ju)儀(yi)(yi)表(biao)盤(pan)(pan)(pan)。儀(yi)(yi)表(biao)盤(pan)(pan)(pan)右(you)上角可以選擇時(shi)(shi)間區間、自動(dong)刷新時(shi)(shi)間,此時(shi)(shi)設備持續(xu)發送數據(ju)采集數據(ju),儀(yi)(yi)表(biao)盤(pan)(pan)(pan)數據(ju)值會有(you)所變動(dong),實現了比較好(hao)的(de)(de)可視化效(xiao)果。

總結
至(zhi)此我(wo)們借(jie)助 EMQ X + TDengine 完(wan)(wan)成了物(wu)聯網數據(ju)(ju)傳(chuan)輸、存儲、展現整個流程的系(xi)統搭建,讀(du)者(zhe)可(ke)以(yi)了解到 EMQ X 豐(feng)富的拓展能力與 TDengine 完(wan)(wan)備的大數據(ju)(ju)平臺特性在物(wu)聯網數據(ju)(ju)采集(ji)中的應用。深入學習掌握(wo) Grafana 的其他功能后(hou),用戶可(ke)以(yi)定制出更完(wan)(wan)善的數據(ju)(ju)可(ke)視(shi)化(hua)乃(nai)至(zhi)監控告警系(xi)統。



























