在 8 月 13 日(ri)的(de) TDengine 開發者大會上,TDengine 引(yin)(yin)擎開發工程(cheng)師劉(liu)繼聰帶(dai)來題為《TDengine 3.0 流處理(li)引(yin)(yin)擎——單機(ji)每秒百萬記錄,毫秒級延(yan)遲(chi)》的(de)主題演講,詳細闡述(shu)了 TDengine 3.0 全新的(de)流式計(ji)算引(yin)(yin)擎技(ji)術實現原理(li)。本(ben)文即根據此演講整理(li)而成。
點擊【這里】查看完整演講視頻
全新的流處理引擎以及數據訂閱
流是(shi)(shi)源(yuan)源(yuan)不斷寫(xie)入 TDengine 中(zhong)的(de)數(shu)據,但作為原始數(shu)據的(de)流無法產生(sheng)更多價值。我們(men)需要(yao)對流進行過濾、分流、變(bian)(bian)換(huan),產生(sheng)新的(de)流寫(xie)入 TDengine 或推送給用(yong)戶程(cheng)序,這就是(shi)(shi) TDengine 流處(chu)理引擎的(de)使(shi)命(ming)。在 TDengine 中(zhong)創建一(yi)個(ge)流,即是(shi)(shi)定義一(yi)個(ge)變(bian)(bian)換(huan),并在流的(de) source 中(zhong)數(shu)據到來(lai)時按照一(yi)定規則(ze)自動觸發,并寫(xie)入 sink。

我(wo)們首先來(lai)看一下 TDengine 中流的幾個例子:
- 作為標量函數的流,對超級表中的每個子表各自運算,寫入新的子表
CREATE STREAM power_stream
INTO power_stream_output_stb AS
SELECT
ts,
concat_ws(".", location, TBNAME) AS meter_location,
current * voltage * cos(phase) AS active_power,
current * voltage * sin(phase) AS reactive_power
FROM meters
WHERE voltage > 100
PARTITION BY TBNAME
通過 CREATE STREAM 定義了(le)流(liu)的名稱(cheng),緊接著是(shi)(shi)(shi)定義流(liu)的 sink,在 INTO 后面緊跟的是(shi)(shi)(shi)一個(ge)超(chao)級表(biao),這個(ge)超(chao)級表(biao)在創(chuang)建流(liu)時會(hui)被同時自動(dong)創(chuang)建出(chu)來。緊接著,是(shi)(shi)(shi)一個(ge) SELECT 查(cha)詢,定義了(le)流(liu)的變換(huan)。這里(li)是(shi)(shi)(shi)一個(ge)帶(dai)過濾的標量函數計算。
創建流(liu)(liu)語(yu)句的(de)最后是(shi)一個 PARTITION 語(yu)句:PARTITION BY TBNAME,表(biao)(biao)(biao)示流(liu)(liu)的(de)分(fen)(fen)區(qu)如何劃(hua)分(fen)(fen),不同(tong)分(fen)(fen)區(qu)的(de)數(shu)據(ju)將被(bei)寫入超級表(biao)(biao)(biao)中的(de)不同(tong)子表(biao)(biao)(biao)。
- 以滑動窗口,聚合超級表中所有子表的流
CREATE STREAM current_stream
TRIGGER AT_ONCE
INTO current_stream_output_stb AS
SELECT
_wstart as start,
_wend as end,
max(current) as max_current
FROM meters
WHERE voltage <= 220
INTEVAL (5s) SLIDING (1s);
我們來看第二個例子,這(zhe)個例子中(zhong)是(shi)一(yi)個滑(hua)動窗口的(de)聚合。在 create stream 之后,通過 TRIGGER AT_ONCE 指定了流的(de)觸(chu)發模式,AT_ONCE 模式表示(shi)當數(shu)據寫(xie)入,立即觸(chu)發計(ji)算并(bing)推送結果(guo),而無需等待窗口關閉。
- 以會話窗口(session window),對超級表中的子表各自聚合
CREATE STREAM current_stream
TRIGGER WINDOW_CLOSE
WATERMARK 60s
INTO current_stream_output_stb AS
SELECT
_wstart as start,
_wend as end,
avg(current) as avg_current
FROM meters
SESSION(ts, 30s)
PARTITION BY TBNAME
第三(san)個例子是(shi)會話窗(chuang)口(kou)(session window)。流除了(le)支持翻滾(gun)窗(chuang)口(kou)(tumble window)和滑動(dong)窗(chuang)口(kou)(sliding window, hop window)外,還(huan)支持會話窗(chuang)口(kou)和狀態窗(chuang)口(kou)(state window),窗(chuang)口(kou)的定(ding)義(yi)與 TDengine 中(zhong)的普通(tong)查詢完全一致(zhi)。在(zai)這里(li),我們再次(ci)引入了(le) PARTITION BY TBNAME 子句,表示(shi)每個子表獨(du)立計算會話窗(chuang)口(kou)并將結果寫(xie)入目的表。
從上(shang)面三種觸發方(fang)式的(de)具體實現中我們可以看到,創建流有(you)幾(ji)個重要的(de)元素:
- 流的源表及其變換:這是一個 SELECT 查詢
- 流的目的表:這是一個自動創建的超級表
- 流的分區:根據不同的 PARTITION 自動創建子表,并寫入對應的子表中去。
定義一個流就是定義一個數據(ju)變換,數據(ju)的變換與 ETL 在 TDengine 內部(bu)完成。

在上(shang)面的(de)(de)(de)例(li)(li)子(zi)中,實時(shi)流(liu)推(tui)送到了 TDengine 中,卻并沒有直接推(tui)送到應(ying)用。這是(shi)因(yin)為,全新(xin)的(de)(de)(de)范式(shi)帶(dai)來(lai)(lai)了全新(xin)的(de)(de)(de)解決(jue)方(fang)案,但是(shi)也會(hui)帶(dai)來(lai)(lai)應(ying)用改(gai)造的(de)(de)(de)額(e)外開銷。如果應(ying)用需要去處理 TDengine 推(tui)送的(de)(de)(de)實時(shi)流(liu),那么(me)將(jiang)帶(dai)來(lai)(lai)更加高昂的(de)(de)(de)成本;反之(zhi)它可以保(bao)留原有的(de)(de)(de) Query 獲取結(jie)果模式(shi),例(li)(li)如,將(jiang)復雜(za)耗時(shi)的(de)(de)(de)查詢改(gai)為對流(liu)聚合(he)結(jie)果的(de)(de)(de)簡單(dan)查詢,最大程度地(di)利用流(liu)帶(dai)來(lai)(lai)的(de)(de)(de)便利。
但有些時(shi)(shi)候應(ying)用仍然需要去(qu)低延遲地獲取數據,比如(ru)在實現監(jian)控報警與異常檢測時(shi)(shi),這種情(qing)況下流就需要真正到達應(ying)用, TDengine 提(ti)供了數據訂(ding)閱功能來(lai)滿足這種需求(qiu)。
從流(liu)的角度,我們也(ye)可以重新理(li)解 TDengine 提供的數(shu)據訂閱功能,數(shu)據訂閱的目的是將寫入的數(shu)據以流(liu)的方式(shi)推送到消費者中(zhong)去(qu),由于(yu)應用的處理(li)能力有限,流(liu)需要被持(chi)久化并(bing)按需讀取(qu),對(dui)應到 Kafka 中(zhong)就(jiu)是消息可以無(wu)限堆(dui)積的特性。

TDengine 中(zhong)(zhong)落盤的(de)數(shu)據(ju)(ju)流就是 WAL,我(wo)們會把寫入(ru)的(de)數(shu)據(ju)(ju)物化(hua),即持(chi)久(jiu)化(hua)落盤下去,這也就是消(xiao)息隊列中(zhong)(zhong)的(de)存(cun)(cun)儲(chu)。從這點出發,我(wo)們將 WAL 改造成一個(ge)真正的(de)存(cun)(cun)儲(chu)引擎,提供靈(ling)活可配置的(de)刪除與文(wen)件切換策略,并建立索(suo)引,再(zai)對(dui)接(jie)查(cha)詢引擎。
數(shu)據(ju)(ju)訂(ding)(ding)閱(yue)使用 CREATE TOPIC 語法從(cong) WAL 中產生數(shu)據(ju)(ju)流,提(ti)供(gong)類似消(xiao)息隊(dui)列的(de)接口(kou),既(ji)可以訂(ding)(ding)閱(yue)用戶創建的(de)表(biao),又可以訂(ding)(ding)閱(yue)流的(de) SINK 表(biao)。在具(ju)體(ti)操(cao)作時(shi),標(biao)量函數(shu)、過濾(lv)可以從(cong) WAL 中提(ti)取數(shu)據(ju)(ju)并(bing)變換(huan),這樣的(de)變換(huan)其實就是(shi)產生一個(ge)實時(shi)流,然后將其推送到應用中去。
流引擎實現原理
前面講(jiang)解了 TDengine 的(de)流(liu)引擎是(shi)(shi)什(shen)么,以及(ji)數據訂閱與流(liu)之(zhi)間的(de)關系,下面我們(men)來看(kan)一下 TDengine 的(de)流(liu)處(chu)理如何(he)實現。當然,這里面細節(jie)很多(duo),內容很雜,由(you)于(yu)時間關系,我只(zhi)會(hui)挑出 3 個最(zui)重要的(de)部分(fen),也是(shi)(shi)我覺得最(zui)有意思的(de)部分(fen)來講(jiang)解,那就(jiu)是(shi)(shi)“事件(jian)驅動”、“增量計(ji)算”與“亂序處(chu)理”。
事件驅動
我們前面已經(jing)講過,流式(shi)計(ji)算和連續(xu)查詢最大(da)的(de)(de)區別之一(yi)在(zai)于,流式(shi)計(ji)算能夠支(zhi)持事件驅(qu)動,也就(jiu)是(shi)每一(yi)條數據的(de)(de)到來(lai)都會(hui)觸發計(ji)算,這樣的(de)(de)特性(xing)讓我們能夠對標量函數進行計(ji)算,從而實現數據的(de)(de)清洗與(yu)預處理(li);并且能夠對窗(chuang)(chuang)口(kou)(kou)聚合提供 AT_ONCE 觸發模式(shi),不再需要等待(dai)窗(chuang)(chuang)口(kou)(kou)關閉,從而支(zhi)持會(hui)話(hua)窗(chuang)(chuang)口(kou)(kou)與(yu)狀態窗(chuang)(chuang)口(kou)(kou)。事件驅(qu)動執行的(de)(de)承(cheng)擔者是(shi) Stream Task,我們先來(lai)看(kan)一(yi)下 Stream Task 如何部署(shu)。

第(di)一個(ge)例子,是標量函數與 partition by tbname 的聚合,來自(zi) source DB 的每一個(ge) vnode 都各自(zi)的進行聚合,并且分發到 target DB 中,由 target DB 中的 Stream Task 負責將數據寫(xie)入對應(ying)子表(biao)。
如圖所示,流(liu)是可(ke)以跨(kua)越 DB 的(de),而不同(tong) DB 代(dai)表(biao)不同(tong)數(shu)據保(bao)存(cun)生命周期,Source DB 的(de) 3 個 Stream Task 代(dai)表(biao)著(zhu)部署在(zai)其中的(de)三(san)個 vnode。在(zai)進行標(biao)量計算(suan)、partition by tbname 聚(ju)合(he)時,數(shu)據可(ke)以不經(jing)過聚(ju)合(he)節點,直接在(zai) Source DB 的(de) vnode 里經(jing)過 Stream Task 完成聚(ju)合(he),再發(fa)送到(dao) target DB。

超級表(biao)聚(ju)合(he)(he)第二個(ge)例子(zi),是(shi)一(yi)(yi)個(ge)分布(bu)式(shi)的聚(ju)合(he)(he),將超級表(biao)中(zhong)所有子(zi)表(biao)聚(ju)合(he)(he)到(dao)一(yi)(yi)起。它需要部署(shu)一(yi)(yi)個(ge)聚(ju)合(he)(he) Stream Task,來匯總來自(zi)各個(ge) vnode 的數(shu)據。在具體實(shi)現上,數(shu)據在源 vnode 處進行一(yi)(yi)級聚(ju)合(he)(he),一(yi)(yi)級聚(ju)合(he)(he)的數(shu)據會被推送(song)到(dao)二級節點進行二級聚(ju)合(he)(he),而(er)聚(ju)合(he)(he)的結果(guo)則根據 trigger 模式(shi)按需推送(song)到(dao) target DB。
那么,這兩級(ji)聚合(he)分別是指什么呢?在(zai)(zai)增量計算部分,我(wo)們會(hui)詳細(xi)講解。不過(guo)在(zai)(zai)此(ci)之前,我(wo)們先來(lai)放大看一看 Stream Task 內部的具(ju)體結構。

每一(yi)個流都由多個 Stream Task 構成(cheng),而每個 Stream Task 都包含了一(yi)個 Input Queue 與 Output Queue。在執行時,流式計(ji)算(suan)(suan)框架會(hui)將 Output Queue 中的數據(ju)分發(fa)到下游的 Stream Task 中去,并通知流的執行調度(du)器,調度(du)空閑的流線程(cheng)觸發(fa)計(ji)算(suan)(suan)。
而(er) Stream Task 內部,具(ju)體(ti)計算(suan)的(de)執行(xing)者是一系列有狀態(tai)的(de)流(liu)(liu)算(suan)子。在(zai)(zai)創建流(liu)(liu)時(shi),SQL 被解析成(cheng)語(yu)法樹,planner 將(jiang)語(yu)法樹拆分成(cheng)多(duo)個的(de) pipeline,而(er)每個 pipeline 就是多(duo)個串聯(lian)起(qi)來(lai)的(de)流(liu)(liu)算(suan)子 。我(wo)們可以看到,從計劃的(de)層面來(lai)看,流(liu)(liu)計劃的(de)最大的(de)區別在(zai)(zai)于去(qu)掉了 Exchange Operator,將(jiang)所(suo)有的(de) pipeline 單機化(hua),pipeline 與 pipeline 之間(jian)采用 push 模(mo)式進行(xing)數(shu)據交換。在(zai)(zai) push 模(mo)式下,我(wo)們能夠對語(yu)法樹自底向上地(di)執行(xing),并(bing)逐級觸(chu)發,這樣(yang)不僅最大程度(du)地(di)減少(shao)了流(liu)(liu)執行(xing)過程中阻(zu)塞,并(bing)且減少(shao)了無效(xiao)的(de)執行(xing)調度(du),因(yin)為(wei)流(liu)(liu)不再需要當事件到來(lai)時(shi)首先調度(du)起(qi)父節點(dian)的(de) Stream Task,向子節點(dian)的(de) Stream Task 拉取數(shu)據。
而流算子(zi)(zi)是有狀態(tai)的算子(zi)(zi),在 Stream Task 中(zhong)有流的狀態(tai)下存儲后端(duan),當內存中(zhong)的狀態(tai)數據(ju)過大時,會溢(yi)出到硬盤。
有狀態增量計算
流計算(suan)根據函數的(de)(de)不同性(xing)質,可以分成很多(duo)種,比如(ru) invertible、holistic 等等,這里我們只(zhi)討論對于 incremental 的(de)(de)計算(suan)是如(ru)何(he)實現的(de)(de)。

我們沒有必(bi)要先引入一系(xi)列復雜(za)的數學(xue)公式或代數結構,只(zhi)需用(yong)一個(ge)最簡(jian)單(dan)的計算(suan)平均值的例(li)子(zi)來展示增量計算(suan)的過程。
對于左邊的(de)(de)圖,數(shu)(shu)(shu)據 1、2、3,平(ping)均值計(ji)(ji)算為 2,當(dang)新(xin)(xin)的(de)(de)數(shu)(shu)(shu)據到來(lai)時,假(jia)設為 4,那(nei)么 4 是(shi)無法與結果 2 進行(xing)增量(liang)計(ji)(ji)算的(de)(de)。如(ru)果要增量(liang)計(ji)(ji)算,那(nei)么我們需(xu)要提取(qu)出(chu)一個狀(zhuang)(zhuang)態(tai)向(xiang)量(liang),記(ji)錄(lu)數(shu)(shu)(shu)據的(de)(de) Sum 與 Count。狀(zhuang)(zhuang)態(tai)向(xiang)量(liang)被維持在算子的(de)(de)狀(zhuang)(zhuang)態(tai)存(cun)儲中,當(dang)新(xin)(xin)的(de)(de)數(shu)(shu)(shu)據到來(lai),新(xin)(xin)數(shu)(shu)(shu)據被直(zhi)接映射成(cheng)(cheng)狀(zhuang)(zhuang)態(tai)空(kong)間中的(de)(de)向(xiang)量(liang),而狀(zhuang)(zhuang)態(tai)空(kong)間中的(de)(de)向(xiang)量(liang)定義了(le)合成(cheng)(cheng)運算,最(zui)終得到一個新(xin)(xin)的(de)(de)狀(zhuang)(zhuang)態(tai)向(xiang)量(liang)。當(dang)需(xu)要得到最(zui)終結果時,根據狀(zhuang)(zhuang)態(tai)向(xiang)量(liang)計(ji)(ji)算出(chu)最(zui)終結果,如(ru)上圖的(de)(de) 10 / 4 = 2.5。
我們(men)抽象一下上述(shu)過程,將上述(shu)的“將原始數據(ju)(ju)映射到狀態空間中(zhong)”定義為 Lift,“將狀態空間中(zhong)的向量(liang)(liang)(liang)合(he)成”定義為 Combine,“將從(cong)狀態向量(liang)(liang)(liang)中(zhong)提取結果”定義為 Lower,就得(de)到了增(zeng)量(liang)(liang)(liang)計(ji)算(suan)的 3 個基本原語:Lift、Combine 與 Lower。狀態向量(liang)(liang)(liang)占用的內存是恒(heng)定的,當數據(ju)(ju)被聚合(he)之后會(hui)被釋放,因此,內存的占用不再與數據(ju)(ju)量(liang)(liang)(liang)正相關,而只與開啟的窗口(kou)數據(ju)(ju)相關,因此能夠在實現大窗口(kou)下的高吞吐量(liang)(liang)(liang)的聚合(he),而不會(hui)導致內存的暴漲。

對于上述過程(cheng)以及(ji)實際(ji)常用函數(shu)如(ru)何拆分(fen),大家可以參考 VLDB 2015 年 General incremental sliding-window aggregation [1] 這篇(pian)論文,以及(ji)一些(xie)后續的工作(zuo)。

這時,我們就能夠明白前(qian)面(mian)講到的分布式(shi)的兩級(ji)聚合到底是什么了:
- 批量插入的數據,會在數據插入的 vnode 首先執行 Lift 與 Combine 操作
- 對于跨多個 vnode 的聚合,會在隨機選擇一個 vnode 部署聚合 stream task,將第一級聚合的 state 再次 combine
- 根據 trigger 模式的不同按需執行 lower
- 兩級增量聚合降低了數據傳輸的量,將 CPU 密集的計算分散到各個節點中去
亂序處理
為了實現(xian)在(zai)亂(luan)序等多種場景下的(de)正確性(xing),TDengine 3.0 中的(de)流(liu)式計算(suan)采用了以(yi)事(shi)件時(shi)間為基準的(de)處理(li)模式,而(er) Watermark 即是對(dui)于亂(luan)序容忍的(de)上界,想要理(li)解亂(luan)序數(shu)據(ju)的(de)處理(li),我們首先需要了解 Watermark。

在上(shang)圖中,縱軸(zhou)表示墻上(shang)時鐘,即真實時間。橫軸(zhou)表示對應 T1、T2、T3 時刻到達 TDengine 中的(de)數據(ju)。藍(lan)色點表示最(zui)新插(cha)入(ru)的(de)數據(ju),Watermark 就是沿著這(zhe)個時間軸(zhou)往過去(qu)的(de)方向去(qu)推移,用最(zui)后(hou)到達的(de)事件時間減去(qu) Watermark 時間,得到時間 T = latest event time – watermark 。所有結束時間早于 T 的(de)窗口(kou)都會被關閉(bi)(bi)。這(zhe)些窗口(kou)已經(jing)超出了亂序容忍的(de)上(shang)界,我們(men)認(ren)為它們(men)不會再有數據(ju)插(cha)入(ru),可以安全關閉(bi)(bi)。
觸(chu)發(fa)(fa)模(mo)式(shi)是 WINDOW_CLOSE、MAX_DELAY 的(de)(de)(de)數(shu)據(ju)(ju)這(zhe)時(shi)會(hui)被推(tui)送(song)。而在(zai) AT_ONCE 模(mo)式(shi)下,窗(chuang)(chuang)口(kou)關閉與結果(guo)推(tui)送(song)無關,只與內存釋放有(you)關,因(yin)為內存是有(you)限的(de)(de)(de),而數(shu)據(ju)(ju)流是無界的(de)(de)(de)。因(yin)此,對于 WINDOW CLOSE 或(huo) MAX DELAY 觸(chu)發(fa)(fa)模(mo)式(shi),Watermark 的(de)(de)(de)選擇是結果(guo)的(de)(de)(de)實時(shi)性(xing)(xing)與正(zheng)確性(xing)(xing)之間(jian)的(de)(de)(de) trade-off。在(zai)數(shu)據(ju)(ju)可能(neng)有(you)亂(luan)序(xu)的(de)(de)(de)情況下,提前關閉窗(chuang)(chuang)口(kou)意味著(zhu)還未聚(ju)合所有(you)的(de)(de)(de)結果(guo),就(jiu)推(tui)送(song)了(le)數(shu)據(ju)(ju),而為了(le)得到(dao)更多的(de)(de)(de)正(zheng)確性(xing)(xing),往往就(jiu)要犧牲實時(shi)性(xing)(xing),這(zhe)也就(jiu)是將窗(chuang)(chuang)口(kou)的(de)(de)(de)關閉根據(ju)(ju) Watermark 來延遲。
而(er)對于 AT ONCE 的(de)觸發模(mo)式(shi),因為(wei)不(bu)會(hui)再有(you)數(shu)據(ju)(ju)源(yuan)源(yuan)不(bu)斷推送的(de)問題,Watermark 更重要的(de)功能是(shi)讓窗口打開與關(guan)閉(bi)處在動態的(de)平衡中,讓“用有(you)限的(de)內(nei)存來處理(li)無(wu)界的(de)數(shu)據(ju)(ju)流與不(bu)斷新增的(de)窗口”成為(wei)可(ke)能。在實(shi)際狀態存儲上,TDengine 3.0 已經實(shi)現了內(nei)存與硬盤兩級,超過內(nei)存的(de)可(ke)以被外溢到硬盤中去,對于狀態存儲,后續我(wo)們還會(hui)進(jin)一步進(jin)行完善。
即使定義了 Watermark,對于亂序仍然超過 Watermark 的數據如何處理呢?我們提供了兩種策略,直接丟棄或從 TSDB(Time-Series Database) 從拉取并(bing)重新(xin)計算,分別對應 IGNORE EXPIRED 1 與 IGNORE EXPIRED 0。不(bu)過從 TSDB 中拉取數據重新(xin)計算只適用于少量(liang)亂序的情況,因為它會(hui)帶來處理速度的降低(di)。
性能指標:單機百萬吞吐,毫秒級延遲
雖然我(wo)將(jiang)性能(neng)指標(biao)作為(wei)了今天演(yan)講(jiang)的副標(biao)題,它是一切新應用場景的基石。但(dan)性能(neng)指標(biao)又并不是一個特別值得看(kan)中的東西:我(wo)們(men)(men)希望用戶(hu)最不用關心的就是性能(neng),因(yin)為(wei)我(wo)們(men)(men)的性能(neng)能(neng)夠滿足絕大多數(shu)場景的需求。為(wei)此,我(wo)們(men)(men)想(xiang)要驗證的是,在一個普(pu)通(tong)的機器上、每秒百萬行數(shu)據(ju)寫入的情況下(xia),TDengine 3.0 仍然可以做到(dao)毫秒級(ji)的延遲(chi)。
我們(men)后續會將 benchmark 完善發(fa)布并讓用戶能更簡單地使用并驗證。

我們(men)的性能測試(shi)主(zhu)要會去驗證以下幾個(ge)方面:
- 測試流對寫入性能的影響:對于有無流情況下的寫入延遲與吞吐量
- 測試流在大寫入吞吐下的結果延遲
- 分別驗證標量函數、每個子表各自聚合、多 vgroup 超級表聚合等幾個主要場景
標量函數變換
create stream perf_stream into perf_db2.output_streamtb as select ts,abs(c1),char_length(tbname),cast(c1 as binary(16)),timezone(),now from perf_db1.stb partition by tbname;

在(zai) 100 個子表(biao)的條(tiao)件下(xia)測試標量函(han)數(shu)計(ji)算,有(you)無流(liu)(liu)對寫入吞(tun)吐(tu)的影響幾乎不大。在(zai)每秒(miao)寫入 200 萬行數(shu)據(ju)的情況下(xia),流(liu)(liu)的結果延遲大概在(zai)幾毫秒(miao);數(shu)據(ju)從客(ke)戶端寫入到(dao) TSDB 到(dao)流(liu)(liu)式計(ji)算引(yin)擎算出最終(zhong)結果的延遲,大約在(zai)幾毫秒(miao)到(dao)十幾毫秒(miao)。
超級表中每個子表各自聚合 : partition by tbname
create stream if not exists perf_stream trigger at_once into perf_db2.output_streamtb as select _wstart as start, min(c1),max(c2), sum(c3), avg(c0), count(c3), first(c0), last(c1), now from perf_db1.stb partition by tbname interval(1s);

我(wo)們選擇了 min、max、sum、avg、count、first、last 作為測(ce)試基準的聚合函數,設置滑動窗口的時間長度(du)是(shi)一(yi)秒(miao),分別測(ce)試了子表數目在 10、100、1k、10k、100k 幾(ji)(ji)個節(jie)點的流結果延遲(chi)數據,跨度(du)在幾(ji)(ji)毫(hao)秒(miao)到十幾(ji)(ji)毫(hao)秒(miao)。
多 vnode 超級表聚合
create stream if not exists perf_stream trigger at_once watermark 30000s into perf_db2.output_streamtb as select _wstart as start, min(c1),max(c2), sum(c3), avg(c0), count(c3), first(c0), last(c1), now from perf_db1.stb interval(1s);
taosBenchmark config: 子表數(shu)目 1k,timestamp_step = 110,線(xian)程(cheng)數(shu) 100,batch_size = 100, interlace = 100(影響(xiang)亂序(xu)程(cheng)度)

這種場景(jing)是典型(xing)的(de)(de)分(fen)布(bu)式的(de)(de)兩級(ji)聚合(he)。在(zai)(zai)(zai)分(fen)布(bu)式兩級(ji)聚合(he)操作中(zhong),要聚合(he)不同子(zi)表中(zhong)的(de)(de)寫入數(shu)據(ju)(ju),因此可能(neng)會存(cun)在(zai)(zai)(zai)一定的(de)(de)亂(luan)(luan)序,我(wo)們在(zai)(zai)(zai) taosBenchmark 中(zhong)設置了(le) interlace 參數(shu)來(lai)控(kong)制數(shu)據(ju)(ju)的(de)(de)亂(luan)(luan)序程度。watermark 設置為(wei) 30000s,這會在(zai)(zai)(zai)一定程度增大內(nei)存(cun)占(zhan)用,但可以(yi)減(jian)少因為(wei)亂(luan)(luan)序觸發(fa)的(de)(de)掃盤。我(wo)們可以(yi)看到,流結果的(de)(de)延(yan)遲仍然(ran)維持(chi)在(zai)(zai)(zai)幾十毫秒。
這個(ge)基準(zhun)測試驗證了 3.0 流式計算引(yin)擎(qing)(qing)性能基本(ben)上可以達到(dao)要(yao)求,當然,目(mu)前(qian) TDengine 的流式計算引(yin)擎(qing)(qing)還是一個(ge)年輕的引(yin)擎(qing)(qing),我們(men)仍(reng)然在做著大量的性能優(you)化(hua),以及更多實際場(chang)景的驗證。
Roadmap
接(jie)下(xia)來 TDengine 3.0 流式(shi)計(ji)算引擎的優(you)化工作將分(fen)為以下(xia)幾方面:
- 更全面的 SQL 支持:Join / Fill / Group by / 子查詢等
- 更完善的流狀態管理,使用戶在 AT ONCE 模式下不再需要關心 Watermark
- 更靈活的 partition 機制:partition by column / 表達式
- 多聚合節點;獨立部署、存算分離的流式計算節點:SNODE
- 可配置的 checkpoint
- Benchmark 完善,端到端的延遲指標、P99 延遲指標
雖然列舉了(le)這些后(hou)續的(de)工作,但真(zhen)正(zheng)決定流(liu)式計算處(chu)理引擎發展的(de),其實是 TDengine 的(de)用戶和(he)社(she)(she)區開發者。我們希(xi)望(wang)大(da)家都能(neng)真(zhen)正(zheng)用上 TDengine 3.0 的(de)流(liu)式計算引擎,能(neng)在開源(yuan)社(she)(she)區中給它貢獻代碼,我們也會多多聆(ling)聽來自客戶以及社(she)(she)區的(de)實時反(fan)饋。
Reference
[1] Tangwongsan, K., Hirzel, M., Schneider, S., & Wu, K. L. (2015). General incremental sliding-window aggregation. Proceedings of the VLDB Endowment, 8(7), 702-713.


























