无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

TDengine中訂閱的用途和用法

Bomin Zhang

2020-02-12 / ,

本(ben)文(wen)將介紹TDengine Database訂閱功(gong)能的使用(yong)場景(jing)、使用(yong)方法和一些(xie)限制,并與(yu)InfluxDB的訂閱功(gong)能進行簡單的對比。本(ben)文(wen)的預期讀者是(shi)基于TDengine開(kai)發各種應用(yong)的軟件開(kai)發人員(yuan)。

什么是訂閱?

訂(ding)閱,是一(yi)(yi)種(zhong)(zhong)數(shu)據(ju)(ju)查詢方式(shi)(shi),其特點(dian)為:客戶端(duan)執行一(yi)(yi)個查詢語句后,可以(yi)增量(liang)形式(shi)(shi),不(bu)斷收(shou)到新到達服(fu)務(wu)端(duan)的(de)(de)、符合查詢條件的(de)(de)數(shu)據(ju)(ju)。訂(ding)閱的(de)(de)實(shi)現模型有兩種(zhong)(zhong),一(yi)(yi)種(zhong)(zhong)是“推”,即服(fu)務(wu)器主(zhu)動(dong)將(jiang)數(shu)據(ju)(ju)發到客戶端(duan);另一(yi)(yi)種(zhong)(zhong)是“拉”,即客戶端(duan)主(zhu)動(dong)向服(fu)務(wu)器請求數(shu)據(ju)(ju)。兩種(zhong)(zhong)方式(shi)(shi)各有優(you)缺點(dian),這里(li)不(bu)做詳細的(de)(de)對(dui)比,只是說明一(yi)(yi)下,TDengine Database使用的(de)(de)是“拉”模型。

什么時候需要使用訂閱?

為了(le)(le)便于(yu)用戶程(cheng)序消費TDengine Database中的(de)數(shu)據(ju)(ju),TDengine實(shi)現了(le)(le)基于(yu)SQL的(de)數(shu)據(ju)(ju)查(cha)詢語(yu)法,并(bing)提供了(le)(le)豐富的(de)聚合函(han)數(shu),這種方式的(de)優勢已在多(duo)個實(shi)際案例中得到了(le)(le)體(ti)現。但由于(yu)時序數(shu)據(ju)(ju)的(de)特點,單(dan)純(chun)的(de)直接數(shu)據(ju)(ju)查(cha)詢并(bing)不能(neng)滿(man)足(zu)用戶程(cheng)序的(de)需求(qiu),比如:我們管理著(zhu)一(yi)批(pi)溫度(du)測(ce)量設(she)備(bei),希望當某個設(she)備(bei)檢測(ce)到的(de)溫度(du)超(chao)過限制(比如80°C)后能(neng)得到通知(zhi)并(bing)進行一(yi)些處(chu)理時,肯定(ding)會先(xian)為所有(you)的(de)設(she)備(bei)建立一(yi)張超(chao)級表:

create database test;
use test;
create table devices (ts timestamp, temperature float) tags(id int);

并為(wei)每個設備創建一張子表(biao):

create table device1 using devices tags(1);
create table device2 using devices tags(2);
...

這種設計(ji)滿(man)足了(le)設備(bei)管理的(de)(de)需求(qiu),但如何(he)滿(man)足溫度監測的(de)(de)需求(qiu)呢?如果僅使用(yong)普通的(de)(de)查(cha)詢,有兩種方法:一是分(fen)別對每張子表進行查(cha)詢,每次查(cha)詢后(hou)記錄最后(hou)一條數據的(de)(de)時(shi)間(jian)戳(chuo),后(hou)續只查(cha)詢這個時(shi)間(jian)戳(chuo)之后(hou)的(de)(de)數據:

select * from device1 where ts > last_timestamp1 and temperature > 80;
select * from device2 where ts > last_timestamp2 and temperature > 80;
...

這確實(shi)可行(xing),但隨著設(she)備數(shu)量的增加,查詢數(shu)量也(ye)會(hui)增加,客戶端和服務端的性能都(dou)會(hui)受到影響,當設(she)備數(shu)增長到一(yi)定(ding)的程度,系統(tong)就(jiu)無(wu)法(fa)承受了。

另一種(zhong)方法是對超級表進行查(cha)詢(xun)。這樣(yang),無論有多少設備,都(dou)只需一次(ci)查(cha)詢(xun):

select * from devices where ts > last_timestamp and temperature > 80;

但是,如何選擇 last_timestamp 就成了一個新的問題。因為,一方面數據的產生時間(也就是數據時間戳)和數據入庫的時間一般并不相同,有時偏差還很大;另一方面,不同設備的數據到達TDengine的時間也會有差異。所以,如果我們在查詢中使用最慢的那臺設備的數據的時間戳作為 last_timestamp ,就可(ke)能(neng)重(zhong)復(fu)讀(du)入其(qi)它設備的(de)數(shu)據(ju);如果使用最快的(de)設備的(de)時間戳,其(qi)它設備的(de)數(shu)據(ju)就可(ke)能(neng)被漏掉。

TDengine的訂閱(yue)功能為上面這個(ge)(ge)問題(ti)提供了一個(ge)(ge)徹底的解決方(fang)案(an)。

如何使用TDengine中的訂閱功能?

TDengine的API中,與(yu)訂(ding)閱相關(guan)的主要有以(yi)下三個:

  • taos_subscribe
  • taos_consume
  • taos_unsubscribe

這三個(ge)(ge)API的(de)(de)具體說明請見《》,下(xia)面結合一個(ge)(ge)示例,介紹下(xia)其使用方法,完(wan)整的(de)(de)示例代碼可以(yi)在找到。

首先是創建訂閱:

TAOS_SUB* tsub = NULL;
if (async) {
  // create an asynchronized subscription, the callback function will be called every 1s
  tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
  // create an synchronized subscription, need to call 'taos_consume' manually
  tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}

TDengine中的訂閱既可以是同步的,也可以是異步的,上面的代碼會根據從命令行獲取的參數async的值來決定使用哪種方式。這里,同步的意思是用戶程序要直接調用 taos_consume來拉取數據,而異步則由API在內部的另一個線程中調用taos_consume,然后把拉取到的數據交給回調函數 subscribe_callback去處理。

參數taos是(shi)一個已經建立(li)好的(de)(de)數(shu)(shu)據庫連接,在同步(bu)模式下無特(te)殊要求。但在異步(bu)模式下,需要注(zhu)意它不(bu)會被其它線程使用,否則(ze)可能導(dao)致不(bu)可預(yu)計的(de)(de)錯(cuo)誤,因為回(hui)調(diao)函(han)數(shu)(shu)在API的(de)(de)內部線程中被調(diao)用,而TDengine的(de)(de)部分API不(bu)是(shi)線程安全的(de)(de)。

參數sql是查詢語句,可以在其中使用where子(zi)句指定(ding)過濾條件(jian)。回到開(kai)頭(tou)的(de)(de)例子(zi),如(ru)果我們只想訂(ding)閱設備溫度超(chao)過 80°C 時的(de)(de)數據,可以這樣寫:

select * from devices where temperature > 80;

注意,這里沒有指定起(qi)始(shi)時(shi)間(jian),所(suo)以會讀到所(suo)有時(shi)間(jian)的(de)數(shu)據(ju)。如果只想從一天前的(de)數(shu)據(ju)開始(shi)訂(ding)閱,而不需要更早的(de)歷(li)史數(shu)據(ju),可以再加上一個時(shi)間(jian)條件:

select * from devices where ts > now - 1d and temperature > 80;

訂閱的topic實(shi)際(ji)上是(shi)它(ta)(ta)的名(ming)字,因為訂閱功能是(shi)在(zai)客戶(hu)端API中實(shi)現的,所(suo)以沒(mei)必要保(bao)證它(ta)(ta)全(quan)局唯一(yi),但需要它(ta)(ta)在(zai)一(yi)臺客戶(hu)端機器上唯一(yi)。

如果名topic的訂閱不存在,參數restart沒有意義;但如果用戶程序創建這個訂閱后退出,當它再次啟動并重新使用這個topic時,restart就會被用于決定是從頭開始讀取數據,還是接續上次的位置進行讀取。本例中,如果restarttrue(非零值),用戶程序肯定會讀到所有數據。但如果這個訂閱之前就存在了,并且已經讀取了一部分數據,且restartfalse(0),用戶程序(xu)就(jiu)不會讀到之前(qian)已經(jing)讀取的數據了。

taos_subscribe的最后一個參數是以毫秒為單位的輪詢周期。在同步模式下,如過前后兩次調用taos_consume的時間間隔小于此時間,taos_consume會阻塞,直到間(jian)(jian)隔超(chao)過(guo)此時(shi)(shi)間(jian)(jian)。異步模式下(xia),這個時(shi)(shi)間(jian)(jian)是兩次調用回調函數的最小時(shi)(shi)間(jian)(jian)間(jian)(jian)隔。

taos_subscribe的倒數(shu)第二個參(can)數(shu)用于用戶程(cheng)序向(xiang)回(hui)調函數(shu)傳遞附加(jia)參(can)數(shu),訂(ding)閱(yue)API不對其(qi)做任(ren)何(he)處理,只原(yuan)樣傳遞給回(hui)調函數(shu)。此參(can)數(shu)在同步模式(shi)下無意義。

訂閱創建以后,就可以消費其數據了,同步模式下,示例代碼是下面的 else 部分:

if (async) {
  getchar();
} else while(1) {
  TAOS_RES* res = taos_consume(tsub);
  if (res == NULL) {
    printf("failed to consume data.");
    break;
  } else {
    print_result(res, blockFetch);
    getchar();
  }
}

這里是一個while循環,用戶每按一次回車鍵就調用一次taos_consume,而taos_consume的返回值是查詢到的結果集,與taos_use_result完全相同,例子中使用這個結果集的代碼是函數print_result

void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW row = NULL;
  int num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int nRows = 0;
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);puts(temp);
      nRows++;
    }
  }
  printf("%d rows consumed.\n", nRows);
}

其中的 taos_print_row 用(yong)于處理訂閱到數(shu)據,在我們的(de)例子中,它會打印出所有(you)符(fu)合條件的(de)記錄。而(er)異步模式下(xia),消(xiao)費(fei)訂閱到的(de)數(shu)據則(ze)顯得更為簡單:

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}

當要結束一次數據訂閱時,需要調用taos_unsubscribe:

taos_unsubscribe(tsub, keep);

其第二個參數,用于決定是否在客戶端保留訂閱的進度信息,如果大家還記得前面說過“訂閱功能是在客戶端API中實現的”的話,應該可以猜到,如果這個參數是false(0),那無論下次調用taos_subscribe的時的restart參數是什么,訂閱都只能重新開始了。另外,進度信息的保存位置是{DataDir}/subscribe/,這個目錄下,每個訂閱有一個與其topic同名的(de)(de)文(wen)件,刪掉某(mou)個(ge)文(wen)件,同樣會導致下次創建其對(dui)應的(de)(de)訂閱時只(zhi)能重新開始(shi)。

代碼介紹完(wan)畢,我們來看(kan)一下實際的運行效果(guo)。假設:

  • 示例代碼已經下載到本地
  • TDengine 也已經在同一臺機器上安裝好
  • 已經按照本文開頭的腳本創建數據庫、超級表和一些子表

則可(ke)以(yi)在(zai)示(shi)例代碼所在(zai)目錄執行以(yi)下命(ming)令來編譯并啟動示(shi)例程序:

$ make
$ ./subscribe -sql='select * from devices where temperature > 80;'

示例程序啟動后,打開另一個終端窗口,啟動 TDengine 的 shell 向 device1 插(cha)入一條溫度為 90 °C 的數據:

$ taos
> use test;
> insert into device1 values(0, 90);

這時,因為(wei)溫度超過了(le)(le) 80 °C ,您應該(gai)可以看到示(shi)例程序將它輸出到了(le)(le)屏幕上。您可以繼續插(cha)入一些數據觀察示(shi)例程序的輸出。

用作消息隊列

本文(wen)開頭(tou)的例子,是用(yong)訂閱實現了一(yi)個報警監控的功能,但其實訂閱也可以用(yong)在(zai)其它場景(jing)中(zhong),比如:消息隊(dui)列。

應用(yong)程(cheng)序可以訂閱(yue)數據庫某(mou)些表(biao)的內容,同一(yi)(yi)個(ge)(ge)表(biao)也可以被多(duo)個(ge)(ge)應用(yong)訂閱(yue),一(yi)(yi)旦表(biao)有(you)新(xin)的記錄,應用(yong)將(jiang)立即得到通知。這樣,再把(ba)數據插入看做(zuo)Publish操作,用(yong)戶完全可以把(ba)TDengine作為一(yi)(yi)個(ge)(ge)消息隊列中間(jian)件來使(shi)用(yong)。

所以,當下(xia)(xia)次面對需(xu)要使用Kafka的場景時,不(bu)妨先考慮(lv)下(xia)(xia)TDengine,因為TDengine除(chu)了(le)安裝(zhuang)包超(chao)小(xiao)、運維超(chao)簡單的優點外,還有(you)一個Kafka不(bu)具(ju)備(bei)的功能——數據過濾:可(ke)以在查(cha)詢語句中指定過濾條(tiao)件,保證讀到的數據都是有(you)用的,不(bu)用再(zai)在代碼中手寫過濾邏輯(ji)了(le)。

與InfluxDB的對比

概念上說(shuo),InfluxDB的(de)訂(ding)閱和TDengine的(de)訂(ding)閱區(qu)別(bie)很(hen)大,我們可以認為(wei)訂(ding)閱在InfluxDB中(zhong)更像(xiang)一種(zhong)數據同(tong)步機(ji)(ji)制,而TDengine中(zhong)的(de)訂(ding)閱則是一種(zhong)數據查詢機(ji)(ji)制:

  • InfluxDB將收到的數據實時推送給其它節點,TDengine通過輪詢的方式拉取數據,InfluxDB具有更好的實時性。
  • InfluxDB中只能訂閱全部數據,TDengine中可以指定數據過濾條件。
  • InfluxDB中只能訂閱當前時間之后的數據,TDengine中可以在訂閱中讀到歷史數據。

所(suo)以,兩相對(dui)比,InfluxDB的(de)優勢是實(shi)時(shi)性,而TDengine則以稍微犧牲實(shi)時(shi)性為代價提供(gong)了更強大(da)的(de)功(gong)能。

限制條件

下面是(shi)一些(xie)TDengine訂閱功能的(de)局限,大家需(xu)要在使用中注意。

  • 訂閱的查詢語句只能是 select 語句,只能查詢原始數據(不支持聚合函數),只能按時間正序查詢數據。
  • 在滿足應用需求的情況下,請盡量將輪詢周期設置的大一些,否則會對系統性能造成影響。
  • 暫不支持亂序數據,用戶程序可能讀不到使用import方式插入的數據。
  • 如果用戶程序異常退出或沒有正確調用taos_unsubscribe,進度信息可能會有錯誤,這時,后續的同名訂閱可能讀到之前已經讀過的數據。