本(ben)篇文(wen)章(zhang)(zhang)是“2024,我想和 TDengine 談談”征(zheng)文(wen)活動(dong)的(de)(de)三等獎作(zuo)品。文(wen)章(zhang)(zhang)從一個具體的(de)(de)業(ye)(ye)務(wu)場景出(chu)發,分(fen)析了(le)(le)企(qi)業(ye)(ye)在面對海量時序數據(ju)(ju)時的(de)(de)挑(tiao)戰,并提出(chu)了(le)(le)利用 TDengine 高效處理和存儲(chu)(chu)數據(ju)(ju)的(de)(de)方法,幫助企(qi)業(ye)(ye)解(jie)決在數據(ju)(ju)采(cai)集、存儲(chu)(chu)、分(fen)析等方面的(de)(de)痛(tong)點(dian)。通過這篇文(wen)章(zhang)(zhang),作(zuo)者不僅展示(shi)了(le)(le)自己(ji)對數據(ju)(ju)處理技術(shu)的(de)(de)理解(jie),還(huan)進(jin)一步闡(chan)釋了(le)(le)時序數據(ju)(ju)庫在行業(ye)(ye)中的(de)(de)潛力與(yu)應用價值,為(wei)讀者提供了(le)(le)很(hen)多(duo)實際的(de)(de)操作(zuo)思路和技術(shu)選型的(de)(de)參考。
在當前的物聯網行業,TDengine 已成為國內占有率最高的開源時序數據庫,并在開源物(wu)聯網(wang)平(ping)臺中(zhong)扮演著至關(guan)重(zhong)要的(de)角(jiao)色。接下來,我們將分幾個模(mo)塊詳細(xi)探討聯犀物(wu)聯網(wang)平(ping)臺如何與 TDengine 深度結(jie)合,并進一步提升 TDengine 的(de)擴(kuo)展性,推動其在物(wu)聯網(wang)場景中(zhong)的(de)應用潛(qian)力。
物模型如何結合 TDengine?
物模型介紹
現實世界由眾(zhong)多真實存(cun)在的(de)物(wu)(wu)理設(she)(she)備組成,我(wo)們(men)可以將這些設(she)(she)備稱之為“物(wu)(wu)”。物(wu)(wu)聯網的(de)目標是通過網絡將這些“物(wu)(wu)”連接在一(yi)起,并(bing)將其數字化(hua)(hua)為云(yun)端服(fu)務或(huo)資(zi)源,從(cong)而實現智能(neng)化(hua)(hua)應用(yong)。因此,在物(wu)(wu)聯網構(gou)建的(de)數字世界中,首先需(xu)要對(dui)“物(wu)(wu)”進行(xing)清(qing)晰、統一(yi)的(de)定(ding)義,明確其功(gong)(gong)能(neng)和(he)能(neng)提供的(de)服(fu)務資(zi)源。ICA 聯盟(meng)從(cong)產(chan)品層面(mian)(mian)對(dui)“物(wu)(wu)”進行(xing)了功(gong)(gong)能(neng)建模,提出了統一(yi)的(de)“物(wu)(wu)的(de)抽象模型”和(he)“物(wu)(wu)的(de)描(miao)述(shu)語(yu)言”(TSL,Things Specification Language)。物(wu)(wu)的(de)抽象模型描(miao)述(shu)了“設(she)(she)備是什么”以及“設(she)(she)備能(neng)做什么”,包(bao)括(kuo)物(wu)(wu)的(de)狀態(tai)、檔案信息(xi)和(he)功(gong)(gong)能(neng)定(ding)義等方(fang)面(mian)(mian)。

在(zai)(zai)物模型中,最為關鍵的是屬性(xing)(property)。以(yi)智(zhi)能(neng)電(dian)燈(deng)(deng)為例,其狀態具有二(er)元性(xing):開啟或關閉。用戶(hu)(hu)可以(yi)通過控制(zhi)操作輕(qing)松在(zai)(zai)這兩種狀態之(zhi)間切換。此外,一(yi)些智(zhi)能(neng)電(dian)燈(deng)(deng)還具備更多高級功(gong)能(neng),允許用戶(hu)(hu)根據個(ge)人需求調整亮度、顏色和色溫等參數。
智能設(she)備(bei)的(de)屬性(xing)通常具(ju)備(bei)讀寫能力(li),這意味著應用程序不僅可以(yi)(yi)讀取設(she)備(bei)的(de)當(dang)前(qian)狀態(tai),還可以(yi)(yi)修改屬性(xing)來調整(zheng)設(she)備(bei)行(xing)為。例(li)如(ru),在(zai)環境(jing)監測設(she)備(bei)中,應用程序可以(yi)(yi)讀取溫度和濕度數據,并根(gen)據需要(yao)調整(zheng)參數,以(yi)(yi)適應不同的(de)環境(jing)條(tiao)件。
物模型超級表與產品的關系
在聯(lian)犀(xi)物聯(lian)網平(ping)臺中,存在兩種物模(mo)(mo)型(xing)(xing):公(gong)共(gong)(gong)物模(mo)(mo)型(xing)(xing)和產(chan)品(pin)(pin)物模(mo)(mo)型(xing)(xing)。公(gong)共(gong)(gong)物模(mo)(mo)型(xing)(xing)是由(you)多個(ge)產(chan)品(pin)(pin)共(gong)(gong)同定義(yi)的共(gong)(gong)享物模(mo)(mo)型(xing)(xing),而(er)產(chan)品(pin)(pin)物模(mo)(mo)型(xing)(xing)則為每(mei)個(ge)特(te)定產(chan)品(pin)(pin)設備單(dan)獨定義(yi)。定義(yi)物模(mo)(mo)型(xing)(xing)時,通(tong)常會有以下兩種操作:
- 產品物模型:每類特定產品設備使用一個超級表來定義。
- 通用物模型:只創建一個超級表,后續的產品可以按需引入該物模型,產品下的設備則直接使用這個超級表。
TDengine 支持靈活的數據模(mo)(mo)型設(she)計,既(ji)可(ke)以使用多列(lie)模(mo)(mo)型,也可(ke)以選擇(ze)單列(lie)模(mo)(mo)型。多列(lie)模(mo)(mo)型相當于將多個字段(duan)存儲在(zai)同(tong)一張超(chao)級表(biao)中(zhong),通常在(zai)寫入(ru)和(he)存儲效率(lv)上表(biao)現(xian)較優。而在(zai)某些情況(kuang)下,例如數據采集點的種類和(he)數量經常變(bian)化時,單列(lie)模(mo)(mo)型則可(ke)能更為適用,因為它簡化了應(ying)用程序的設(she)計和(he)管(guan)理(li),允許獨立管(guan)理(li)和(he)擴展(zhan)每個物理(li)量的超(chao)級表(biao)。
綜上所(suo)述,TDengine 提供了(le)靈活的(de)數據(ju)模(mo)型選(xuan)(xuan)項,用(yong)(yong)戶可以根據(ju)具體(ti)需求和應用(yong)(yong)場景(jing)選(xuan)(xuan)擇(ze)最適合的(de)模(mo)型。無論是(shi)采(cai)用(yong)(yong)窄表設計、單列模(mo)型還(huan)是(shi)多列模(mo)型,最終目的(de)是(shi)為了(le)優化性能和簡化管理復雜性。
物模型和表分為三種對應關系
1. 普通類型: 根據(ju)物模型(xing)的數據(ju)類(lei)型(xing),映射(she)單列模式的超(chao)級表。物模型(xing)定義(yi)示例如下圖所示。

示(shi)例 SQL 如下:
CREATE STABLE IF NOT EXISTS model_custom_property_00b_int (ts timestamp,param BIGINT) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
相關參數說明:

param 物(wu)模型和(he) TDengine 表結(jie)構的定義對照如下:

下(xia)面是完整的(de)建表語句:
CREATE STABLE IF NOT EXISTS model_custom_property_00b_int (ts timestamp,param BIGINT) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_float (ts timestamp,param DOUBLE) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_enum (ts timestamp,param SMALLINT) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_timestamp (ts timestamp,param TIMESTAMP) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_bool (ts timestamp,param BOOL) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
CREATE STABLE IF NOT EXISTS model_custom_property_00b_string (ts timestamp,param BINARY(5000)) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
對應的每(mei)個(ge)設備創建的普通表(biao)如下:
CREATE TABLE IF NOT EXISTS device_property_00b_device1_int USING model_custom_property_00b_int TAGS('00b','device1','int');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_enum USING model_custom_property_00b_enum TAGS('00b','device1','enum');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_bool USING model_custom_property_00b_bool TAGS('00b','device1','bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_string USING model_custom_property_00b_string TAGS('00b','device1','string');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_float USING model_custom_property_00b_float TAGS('00b','device1','float');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_timestamp USING model_custom_property_00b_timestamp TAGS('00b','device1','timestamp');
2. 結構體類型:擁有(you)多(duo)(duo)個字段(duan)并將物模型進行(xing)整體抽象,映射為(wei)多(duo)(duo)列模式的超級表。物模型定義示(shi)例如下(xia)圖(tu)所示(shi)。

示例 SQL 如下:
CREATE STABLE IF NOT EXISTS model_custom_property_00b_struct (ts timestamp, latitude DOUBLE,longitude DOUBLE) TAGS (product_id BINARY(50),device_name BINARY(50),property_type BINARY(50));
相關參數說明:

對應的每個(ge)設備創建的普通表如下(xia):
CREATE TABLE IF NOT EXISTS device_property_00b_device1_struct USING model_custom_property_00b_struct TAGS('00b','device1','struct');
3. 數組類型: 數組類型在物聯網平臺中較為特殊,傳統平臺中的數組無法單獨操作某一位。例如,如果想單獨修改開關10的狀態,必須傳遞完整的數組(如:[0,1,1,0,1,0,1,1,0,1])來進行控制,這種方式并不符合現實世界的需求。聯犀則對數組進行了擴展,支持下角標訪問。比如,要修改開關 10 的狀態,只需傳遞 "switch.10": 1 即可。接下(xia)來,讓(rang)我們來看一下(xia)聯犀是如何處理這(zhe)種數據(ju)結構的(de)。物(wu)模型定義示例如下(xia)圖所示。

示(shi)例 SQL 如下:
CREATE STABLE IF NOT EXISTS model_custom_property_00b_switchg (ts timestamp,param BOOL) TAGS (product_id BINARY(50),device_name BINARY(50),_num BIGINT,property_type BINARY(50));
相關參數說明:

可以看到,數組類型的定義與簡單類型相似,區別在于它額外添加了一個 _num 字段(duan),用(yong)來標(biao)識(shi)數組(zu)的(de)(de)下標(biao)。這種設計(ji)的(de)(de)好處(chu)在(zai)于,即使數組(zu)長度達到 1000,依然(ran)只需定義一(yi)個超級表(biao)。雖然(ran)每個設備的(de)(de)普(pu)通表(biao)需要定義 1000 個,但由于是(shi)由一(yi)個超級表(biao)進行管(guan)理(li),整體(ti)管(guan)理(li)變得(de)更加簡便。接下來,我們來看看每個設備創建的(de)(de)普(pu)通表(biao)是(shi)什(shen)么樣的(de)(de)。
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_0 USING model_custom_property_00b_switchg TAGS('00b','device1',0,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_1 USING model_custom_property_00b_switchg TAGS('00b','device1',1,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_2 USING model_custom_property_00b_switchg TAGS('00b','device1',2,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_3 USING model_custom_property_00b_switchg TAGS('00b','device1',3,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_4 USING model_custom_property_00b_switchg TAGS('00b','device1',4,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_5 USING model_custom_property_00b_switchg TAGS('00b','device1',5,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_6 USING model_custom_property_00b_switchg TAGS('00b','device1',6,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_7 USING model_custom_property_00b_switchg TAGS('00b','device1',7,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_8 USING model_custom_property_00b_switchg TAGS('00b','device1',8,'bool');
CREATE TABLE IF NOT EXISTS device_property_00b_device1_switchg_9 USING model_custom_property_00b_switchg TAGS('00b','device1',9,'bool');
如何發揮 TDengine 的性能?
在(zai)設備(bei)上(shang)報(bao)信息時,如果在(zai)線(xian)設備(bei)達到 120 萬(wan),每個設備(bei)每 10 分(fen)鐘上(shang)報(bao)一次數據,那么(me)每秒鐘大(da)約會有(you) 2000 條(tiao)(tiao)消息需(xu)(xu)要處理。每條(tiao)(tiao)消息不僅需(xu)(xu)要入庫一條(tiao)(tiao)調試日(ri)志,還需(xu)(xu)要記(ji)錄一條(tiao)(tiao)屬性(xing)歷史數據。通過大(da)量(liang)性(xing)能測試和優化,我們發現,突破 2000 QPS 是(shi)非(fei)常具有(you)挑戰性(xing)的。深入分(fen)析了 TDengine 的體系架構(gou)后(hou),我們實施了以下優化措施:
- 從 HTTP 切換到 WebSocket:TDengine 從 3.x 版本開始支持 WebSocket,我們順勢升級,經過測試,性能和穩定性都有所提升,但在高并發的情況下,系統資源消耗依然較大。
- 改進同步操作為異步操作:通過將同步操作轉為異步處理,顯著提高了整體性能。
- SQL 批量插入優化:TDengine 支持單條 SQL 語句插入多條數據,甚至跨表插入,這大大提升了寫入性能。
官方語法如下:
INSERT INTO
tb_name
[USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] [(field1_name, ...)]
VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path [tb2_name [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)] [(field1_name, ...)]
VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path ...];
INSERT INTO tb_name [(field1_name, ...)] subquery
官方示例如下:
INSERT INTO
d21001 USING meters TAGS ('California.SanFrancisco', 2) VALUES ('2021-07-13 14:06:34.630', 10.2, 219, 0.32) ('2021-07-13 14:06:35.779', 10.15, 217, 0.33)
d21002 USING meters (groupId) TAGS (2) VALUES ('2021-07-13 14:06:34.255', 10.15, 217, 0.33)
d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31);
聯犀利用這一語法實(shi)現(xian)了異步操作,操作流程如下:
- 設備插入數據時,首先生成類似
d21003 USING meters (groupId) TAGS (2) (ts, current, phase) VALUES ('2021-07-13 14:06:34.255', 10.27, 0.31)格式的插入語句。 - 將生成的 SQL 語句放入 Golang 的 Channel 中。
- 多個異步入庫協程監聽 Channel,并從中取出 SQL 語句。當執行間隔超過 1 秒或 SQL 數量達到上限時,協程會將這些數據合并成完整的 SQL 批量插入數據庫。
由于設備上報數據的(de)整(zheng)個流程(cheng)不涉及磁盤(pan)操作,整(zheng)個過程(cheng)速(su)度非常快(kuai)。即使在未完全優化的(de)情況下(xia),經過測試,單機配置為 8 核 16GB 的(de)服務器,能夠穩定支持每(mei)秒 7000 并發請(qing)求,且保持毫秒級的(de)低(di)延遲,并無任何(he)錯(cuo)誤(wu)。

關鍵代碼如下:
type Td struct {
*sql.DB
}
type ExecArgs struct {
Query string
Args []any
}
var (
td = Td{}
once = sync.Once{}
insertChan = make(chan ExecArgs, 1000)
)
const (
asyncExecMax = 200 //異步執行sql最大數量
asyncRunMax = 40
)
func NewTDengine(DataSource conf.TSDB) (TD *Td, err error) {
once.Do(func() {
if DataSource.Driver == "" {
DataSource.Driver = "taosWS"
}
td.DB, err = sql.Open(DataSource.Driver, DataSource.DSN)
if err != nil {
return
}
td.DB.SetMaxIdleConns(50)
td.DB.SetMaxOpenConns(50)
td.DB.SetConnMaxIdleTime(time.Hour)
td.DB.SetConnMaxLifetime(time.Hour)
_, err = td.Exec("create database if not exists ithings;")
if err != nil {
return
}
for i := 0; i < asyncRunMax; i++ {
utils.Go(context.Background(), func() {
td.asyncInsertRuntime()
})
}
})
if err != nil {
logx.Errorf("TDengine 初始化失敗,err:%v", err)
}
return &td, err
}
func (t *Td) asyncInsertRuntime() {
r := rand.Intn(1000)
tick := time.Tick(time.Second/2 + time.Millisecond*time.Duration(r))
execCache := make([]ExecArgs, 0, asyncExecMax*2)
exec := func() {
if len(execCache) == 0 {
return
}
sql, args := t.genInsertSql(execCache...)
var err error
for i := 3; i > 0; i-- { //三次重試
_, err = t.Exec(sql, args...)
if err == nil {
break
}
}
if err != nil {
logx.Error(err)
}
execCache = execCache[0:0] //清空切片
}
for {
select {
case _ = <-tick:
exec()
case e := <-insertChan:
execCache = append(execCache, e)
if len(execCache) > asyncExecMax {
exec()
}
}
}
}
func (t *Td) AsyncInsert(query string, args ...any) {
insertChan <- ExecArgs{
Query: query,
Args: args,
}
}
func (t *Td) genInsertSql(eas ...ExecArgs) (query string, args []any) {
qs := make([]string, 0, len(eas))
as := make([]any, 0, len(eas))
for _, e := range eas {
qs = append(qs, e.Query)
as = append(as, e.Args...)
}
return fmt.Sprintf("insert into %s;", strings.Join(qs, " ")), as
}
TDengine 查詢
數據(ju)(ju)(ju)插入(ru)數據(ju)(ju)(ju)庫(ku)后,我們開始著手進(jin)行(xing)數據(ju)(ju)(ju)查詢和(he)展示。得益于(yu) TDengine 支持(chi)多種豐富的數據(ju)(ju)(ju)聚合方(fang)式,數據(ju)(ju)(ju)分析變得更加高效和(he)便捷。
ORM 設計
靈活(huo)的(de)(de)(de)查詢方式離不開(kai) ORM 框(kuang)(kuang)架(jia)的(de)(de)(de)支持。然而,TDengine 的(de)(de)(de)官方庫并未(wei)提(ti)供 ORM 框(kuang)(kuang)架(jia),開(kai)源社(she)區也沒有專門(men)為 TDengine 開(kai)發的(de)(de)(de) ORM 框(kuang)(kuang)架(jia)。為了(le)填補這一(yi)空白,聯犀基于知名 ORM 框(kuang)(kuang)架(jia) Squirrel(進行了(le)擴(kuo)展(zhan),以支持 TDengine 的(de)(de)(de)語法。以下是 ORM 的(de)(de)(de)示例:
func (d *DeviceDataRepo) getPropertyArgFuncSelect(
ctx context.Context,
filter msgThing.FilterOpt) (sq.SelectBuilder, error) {
schemaModel, err := d.getSchemaModel(ctx, filter.ProductID)
if err != nil {
return sq.SelectBuilder{}, err
}
p, ok := schemaModel.Property[filter.DataID]
if !ok {
return sq.SelectBuilder{}, errors.Parameter.AddMsgf("dataID:%s not find", filter.DataID)
}
var (
sql sq.SelectBuilder
)
if p.Define.Type == schema.DataTypeStruct {
sql = sq.Select("FIRST(ts) AS ts", d.GetSpecsColumnWithArgFunc(p.Define.Specs, filter.ArgFunc))
} else {
sql = sq.Select("FIRST(ts) AS ts", fmt.Sprintf("%s(param) as param", filter.ArgFunc))
}
if filter.Interval != 0 {
sql = sql.Interval("?a", filter.Interval) //TDengine特有語法
}
if len(filter.Fill) > 0 {
sql = sql.Fill(filter.Fill)//TDengine特有語法
}
return sql, nil
}
靈活的查詢接口
借助上述的 ORM 底層(ceng)實現,我(wo)們才得以實現靈活的查詢接口,下面是查詢接口示(shi)例。
請求參數:

回復參數:

TDengine 鏈路追蹤
最后,我們還需要解決鏈路追蹤的問題:我們需要記錄每個 SQL 的執行耗時,并能夠與業務鏈路 ID 打通,確保可以通過日志追溯到具體的 SQL 執行者和執行過程。雖然官方驅動較為底層,但它支持通過 context 進行傳遞。聯犀的鏈路追蹤同樣基于 context 傳遞,因此我們只需在執行 SQL 時記錄相關日志即可。無論是采用 HTTP、WebSocket 還是 CGo 連接方式,日志都統一記錄在 driver-go/taosWS/connection.go 文件中(zhong)。聯犀打印(yin)的日志如下:

結語
通(tong)過(guo)以上一(yi)系列工作(zuo),我(wo)們(men)(men)成功地實(shi)現了 TDengine 在建(jian)模、數據寫入、查詢和運(yun)維等各個環(huan)節(jie)的無縫銜接,并(bing)與物(wu)聯網業務完美(mei)融合。為(wei)了感謝(xie) TDengine 對我(wo)們(men)(men)產(chan)品(pin)的支持與幫助(zhu),我(wo)們(men)(men)特撰此(ci)文,希望能為(wei)相關從業人士提供一(yi)些(xie)借鑒與啟發。
附錄
聯犀開源地址:
TDengine ORM 定(ding)制:
TDengine 官方驅動定(ding)制:


























