小 T 導讀: 在(zai) TDengine 的(de)官方文(wen)(wen)檔上放出來(lai)已經有(you)一段時間了(le),我(wo)們(men)也收到了(le)一些開發者的(de)反饋。文(wen)(wen)檔中的(de)教程使用(yong) Confluent 平(ping)臺(集成了(le) Kafka)演示了(le)如何(he)使用(yong) Source Connector 和 Sink Connector,但是很多(duo)開發者在(zai)生產環境中并沒有(you)使用(yong) Confluent,所以為方便大家,本文(wen)(wen)將(jiang)使用(yong)獨立部署(shu)的(de) Kafka 來(lai)演示。
本文包含以下內容:
- 如何使用 TDengine Sink Connector, 把數據從 Kafka 同步到 TDengine。
- TDengine Sink Connector 的實現原理。
- 一個簡單的測試腳本,幫助你在自己的環境中快速測試。通過更改生成測試數據的程序和配置參數,你可以模擬自己的使用場景。
- 測試同步同一個 topic,使用不同分區數和不同 Sink 任務數對性能的影響。
背景知識
如果你(ni)對(dui)文章(zhang)開(kai)頭(tou)出(chu)現(xian)的術語并不陌生,那(nei)么可以跳過這一(yi)部分。
· 什么是 Kafka?
Kafka 的核心(xin)是(shi)一個(ge)通用的、分布式的、可重復消費(fei)的消息隊列。
與之相比,作為一款時序數據庫(Time Series Database),TDengine 也可看(kan)作針(zhen)對(dui)結構化的(de)時序數據的(de)消息隊列。
· 什么是 Kafka Connect? 為什么使用 Kafka Connect?
Kafka Connect 是 Kafka 的一個組件,簡化了 Kafka 與其它數據(ju)(ju)源的集成。用戶(hu)通過 Kafka Connect 讀寫 Kafka;通過 Kafka Connect 插件(也稱 Kafka Connector)來(lai)讀寫各(ge)種數據(ju)(ju)源。
為(wei)方便(bian)集成(cheng),Kafka 已經提供了生產(chan)者(zhe)和消費者(zhe) API 以及(ji)客(ke)戶端庫(ku),那為(wei)什么還(huan)需要(yao) Kafka Connect 呢?因為(wei)一個好的 Kafka 客(ke)戶端程序,不(bu)是單單生產(chan)或(huo)消費數據,還(huan)需要(yao)考(kao)慮(lv)容錯、重啟(qi)、日志、彈性伸縮、序列化(hua)以及(ji)反序列化(hua)等(deng)。當(dang)開(kai)發(fa)者(zhe)自己完成(cheng)了這一切,就相當(dang)于開(kai)發(fa)了一個和 Kafka Connect 類似的東西。
與 Kafka 集(ji)成是 Kafka Connect 已經解決的問題,用戶不需(xu)(xu)要重復造(zao)輪子(zi),只有少數邊緣場景才需(xu)(xu)要定制化的集(ji)成方(fang)案。
TDengine Sink Connector 的實現原理
TDengine Sink Connector 用(yong)于將(jiang) Kafka 中指定 topic 的數據(批量或(huo)實時)同步到 TDengine 的 database 中。
啟動 Sink Connector 需要一個 properties 配置文件。詳細(xi)配置見(jian)官方(fang)文檔的。
Sink Connector 內部的實現非常(chang)簡單,整(zheng)體(ti)工作流程分為以下幾(ji)個步驟:
- Connect 框架根據配置啟動 N 個消費者線程。
- N 個消費者同時訂閱數據,并用配置文件中指定的 key.converter 和 value.converter 做反序列化。
- Connect 框架把反序列化后的數據傳遞給 N 個 SinkTask 的實例。
- SinkTask 使用 TDengine 提供的 schemaless 寫入接口來寫入數據。
上(shang)述(shu) 4 個步驟,只有最(zui)后一步寫數據(ju)是(shi) Sink Connector 需要關心的,其(qi)它都(dou)是(shi) Connect 框架自動實現的。
下面重點討論幾個問(wen)題。
· 支持的數據格式
因為(wei)使用了 schemaless 寫入接口,因此 TDengine Sink Connector 只支持三種格(ge)式的(de)數(shu)據(ju)(ju):、 和 。使用配置項 db.schemaless 來指定寫入時使用的(de)數(shu)據(ju)(ju)格(ge)式。例如:
db.schemaless=line
如果 Kafka 中的(de)(de)數據(ju)已(yi)經是這三種格式之一,那么配置文件中的(de)(de) value.converer,只(zhi)需指(zhi)定為 Connnect 內(nei)置的(de)(de) org.apache.kafka.connect.storage.StringConverter。
value.converter=org.apache.kafka.connect.storage.StringConverter
如果 Kafka 中已有(you)的數據不(bu)是上述(shu)三種(zhong)之一(yi),則需要(yao)實現自己的 Converter 類, 將其轉換為三種(zhong)格(ge)式之一(yi),也許能(neng)幫到你。
· 如何指定 Consumer 的參數?
既然 Connect 框架(jia)已經幫我們(men)做了 Consumer 要做的(de)事,那么(me)我們(men)怎么(me)來控(kong)(kong)制(zhi) Consumer 的(de)行為呢?比(bi)如如何(he)(he)控(kong)(kong)制(zhi) Consumer 訂閱(yue)的(de)主題?如何(he)(he)控(kong)(kong)制(zhi) Consumer 每次(ci) poll 的(de)消(xiao)息數和時間(jian)間(jian)隔?
對于訂閱(yue)哪些主題,可以用配置(zhi)項 topics 來指定(ding)。
如果想(xiang)(xiang)覆蓋(gai) Consumer 的(de)其它默認配(pei)置(zhi),可以直接(jie)在(zai) Sink Connector 的(de)配(pei)置(zhi)文(wen)件中編寫,但是要加前綴 “consumer.override.”,比(bi)如想(xiang)(xiang)把每次(ci) poll 的(de)最大消(xiao)息數(shu)改為(wei) 3000, 可以這樣配(pei)置(zhi):
consumer.override.max.poll.records=3000
· 如何控制寫入線程數?
對于(yu) Kafka Connect Sink,task 本質上就(jiu)是消費者線程(cheng),接收從(cong) topic 的(de)(de)分區(qu)(qu)讀出來的(de)(de)數(shu)(shu)(shu)(shu)(shu)(shu)據。用配置(zhi)參數(shu)(shu)(shu)(shu)(shu)(shu) tasks.max 來控制最大(da)任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu),一個(ge)(ge)(ge)任(ren)務(wu)一個(ge)(ge)(ge)線程(cheng)。實際(ji)啟(qi)動(dong)的(de)(de)任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu)還與(yu)(yu) topic 的(de)(de)分區(qu)(qu)數(shu)(shu)(shu)(shu)(shu)(shu)有(you)關。如(ru)(ru)果(guo)你有(you) 10 個(ge)(ge)(ge)分區(qu)(qu),并(bing)且 tasks.max 設置(zhi)為(wei) 5, 那么每(mei)個(ge)(ge)(ge) task 會(hui)收到(dao) 2 個(ge)(ge)(ge)分區(qu)(qu)的(de)(de)數(shu)(shu)(shu)(shu)(shu)(shu)據,并(bing)跟蹤(zong) 2 個(ge)(ge)(ge)分區(qu)(qu)的(de)(de) offsets。如(ru)(ru)果(guo)你配置(zhi)的(de)(de) tasks.max 比 partition 數(shu)(shu)(shu)(shu)(shu)(shu)大(da), Connect 會(hui)啟(qi)動(dong)的(de)(de) task 數(shu)(shu)(shu)(shu)(shu)(shu)與(yu)(yu) topic 的(de)(de) partition 數(shu)(shu)(shu)(shu)(shu)(shu)相同。如(ru)(ru)果(guo)你訂閱了 5 個(ge)(ge)(ge) topic,每(mei)個(ge)(ge)(ge) topic 都(dou)是 1 個(ge)(ge)(ge)分區(qu)(qu), 并(bing)且設置(zhi) tasks.max = 5, 那么實際(ji)會(hui)啟(qi)動(dong)多少個(ge)(ge)(ge)任(ren)務(wu)呢?答(da)案是 1 個(ge)(ge)(ge), 任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu)與(yu)(yu) topic 數(shu)(shu)(shu)(shu)(shu)(shu)量沒有(you)關系(xi)。
TDengine Sink Connector 使用示例
這(zhe)一(yi)部分(fen)我們在(zai)一(yi)臺 Linux 服務器(qi)上(shang)搭(da)建測試環(huan)境,并(bing)運行簡單(dan)的(de)示(shi)例(li)程序。示(shi)例(li)中將(jiang) Kafka 部署(shu)到了個人的(de) home 目錄(lu)。操作(zuo)時請注意(yi)把(ba)路徑中的(de)用(yong)戶(hu)名(bding)替換為自己的(de)用(yong)戶(hu)名。
· 環境準備
- Java 1.8
- Maven
- 了 TDengine 相關服務進程:taosd 和 taosAdapter。
第一步:安裝 Kafka
wget //dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgztar -xzf kafka_2.13-3.2.0.tgz
編輯 .bash_profile, 加入:
export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0export PATH=$PATH:$KAFKA_HOME/bin
source .bash_profile
第二步:配置 Kafka
配置 Kafka Connect 加載插件的路徑。
cd kafka_2.13-3.2.0/config/vi connect-standalone.properties
追加
plugin.path=/home/bding/connectors
修(xiu)改 Connector 插(cha)件的(de)日(ri)志級別。這一步(bu)非常重要,我們將(jiang)通(tong)過插(cha)件的(de)日(ri)志統計同步(bu)數據花費的(de)時間(jian)。
vi connect-log4j.properties
追加
log4j.logger.com.taosdata.kafka.connect.sink=DEBUG
第三步:編譯并安裝插件
git clone git@github.com:taosdata/kafka-connect-tdengine.gitcd kafka-connect-tdenginemvn clean packageunzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip
第四步:啟動 ZooKeeper Server 和 Kafka Server
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
第五步:創建 topic
kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092
第六步:生成測試數據
將下列腳(jiao)本(ben)保(bao)存為 gen-data.py:
#!/usr/bin/python3import randomimport systopic = sys.argv[1]count = int(sys.argv[2])start_ts = 1648432611249000000location = ["SanFrancisco", "LosAngeles", "SanDiego"]for i in range(count):ts = start_ts + irow = f"{topic},location={location[i % 3]},groupid=2 current={random.random() * 10},voltage={random.randint(100, 300)},phase={random.random()} {ts}"print(row)
然后執行:
python3 gen-data.py meters 10000 | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
生(sheng)成 10000 條(tiao) InfluxDB 行(xing)協議格式的數(shu)據到 topic meters。每條(tiao)數(shu)據又包含 2 個(ge)標簽字段和 3 個(ge)數(shu)據字段。
第七步:啟動 Kafka Connect
將下列配置保存為 sink-test.properties。
name=TDengineSinkConnectorconnector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnectortasks.max=1topics=metersconnection.url=jdbc:TAOS://127.0.0.1:6030connection.user=rootconnection.password=taosdataconnection.database=powerdb.schemaless=linekey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.storage.StringConverter
然后執行:
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
第八步:檢查 TDengine 中的數據
使用 TDengine CLI 查詢 power 數(shu)據(ju)庫 meters 表(biao),檢查是否正好包含 10000 條數(shu)據(ju)。
[bding@vm95 test]$ taosWelcome to the TDengine shell from Linux, Client Version:2.6.0.4Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.taos> select count(*) from power.meters;count(*) |========================10000 |
TDengine Sink Connector 性能測試
· 測試流程
這一(yi)部分,我們將上面示例步驟中的(de)(de)第四步到第七步封(feng)裝成可重復運行的(de)(de) shell 腳本,并做以下修改:
- 將 topic 的分區數作為腳本的第 1 個參數, 同時配置 tasks.max,使其等于分區數。這樣我們可以控制每次測試使用的寫入線程數。
- 將生成測試數據的條數作為腳本的第 2 個參數,用來控制每次測試同步的數據量。
- 啟動測試前清空所有數據,測試結束后停止 Connect、Kafka 和 ZooKeeper。
每次(ci)測試都先寫數(shu)據到 Kafka,然后(hou)再啟動 Connect 同(tong)(tong)步數(shu)據到 TDengine,這(zhe)(zhe)樣做可以把同(tong)(tong)步數(shu)據的(de)壓(ya)力全部集中到 Sink 插件這(zhe)(zhe)邊。我們統(tong)計 Sink Connector 從(cong)接(jie)收(shou)到第一批(pi)數(shu)據到接(jie)收(shou)到最后(hou)一批(pi)數(shu)據之間的(de)時間,作為同(tong)(tong)步數(shu)據的(de)總耗時。
完整腳本如下:
#!/bin/bashif [ $# -lt 2 ];thenecho "Usage: ./run-test.sh <num_of_partitions> <total_records>"exit 0fi echo "---------------------------TEST STARTED---------------------------------------"echo clean data and logstaos -s "DROP DATABASE IF EXISTS power"rm -rf /tmp/kafka-logs /tmp/zookeeperrm -f $KAFKA_HOME/logs/connect.lognp=$1 # number of partitionstotal=$2 # number of recordsecho number of partitions is $np, number of recordes is $total.echo start zookeeperzookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.propertiesecho start kafkasleep 3kafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiessleep 5echo create topickafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092echo generate test datapython3 gen-data.py meters $total | kafka-console-producer.sh --broker-list localhost:9092 --topic metersecho alter connector configuration setting tasks.max=$npsed -i "s/tasks.max=.*/tasks.max=${np}/" sink-test.propertiesecho start kafka connectconnect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.propertiesecho -e "\e[1;31m open another console to monitor connect.log. press enter when no more data received.\e[0m"readecho stop connectjps | grep ConnectStandalone | awk '{print $1}' | xargs killecho stop kafka serverkafka-server-stop.shecho stop zookeeperzookeeper-server-stop.sh# extract timestamps of receiving the first batch of data and the last batch of datagrep "records" $KAFKA_HOME/logs/connect.log | grep meters- > tmp.logstart_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | head -1`stop_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | tail -1`echo "--------------------------TEST FINISHED------------------------------------"echo "| records | partitions | start time | stop time |"echo "|---------|------------|------------|-----------|"echo "| $total | $np | $start_time | $stop_time |"
如果要測試使用(yong) 1 個分(fen)區(qu),共 100 萬條數據的(de)性(xing)能,可以這(zhe)樣執行(xing):
./run-test.sh 1 1000000
執行過(guo)程的截(jie)圖如下:

注意中間有一個交互過程。因為腳本無法確定數(shu)據是(shi)否(fou)同步完,需(xu)要用(yong)戶監控(kong) connect.log 來確定是(shi)否(fou)已經消費完了所有數(shu)據,例(li)如:
[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/[bding@vm95 logs]$ tail -f connect.log[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
當日(ri)志不再滾(gun)動,就說明(ming)已經消費完了(le)
· 測試結果
寫入速度與數據量和線程數的關系表

上(shang)表第 1 列為總數(shu)(shu)據(ju)量,第 1 行為消費(fei)者線(xian)程數(shu)(shu),也是寫(xie)入線(xian)程數(shu)(shu)。中間(jian)為平均(jun)每秒寫(xie)入記錄數(shu)(shu)。
寫入速度與數據量和線程數的關系圖

結果分析
從上(shang)圖可以看出,相同(tong)數據量,線(xian)程(cheng)越多寫(xie)(xie)(xie)入(ru)速度(du)越快。當(dang)使(shi)用單線(xian)程(cheng)寫(xie)(xie)(xie)入(ru)時,每(mei)秒(miao)能寫(xie)(xie)(xie)入(ru)大(da)概 10 萬以上(shang)。當(dang)使(shi)用 5 個(ge)線(xian)程(cheng)寫(xie)(xie)(xie)入(ru)時,每(mei)秒(miao)寫(xie)(xie)(xie)入(ru)大(da)概 35 萬左(zuo)右(you)。當(dang)使(shi)用10 個(ge)線(xian)程(cheng)時,每(mei)秒(miao)能寫(xie)(xie)(xie)入(ru)55 萬左(zuo)右(you)。
寫入速度比較(jiao)平穩,與總數據量關系不(bu)大。
同時(shi)也發現線(xian)程(cheng)增(zeng)加越多,線(xian)程(cheng)增(zeng)加帶來的速度提升越少。線(xian)程(cheng)數從(cong)(cong) 1 變(bian)到 10,速度只從(cong)(cong) 10 萬變(bian)到 50 萬。可能的原(yuan)因是數據(ju)(ju)在各個分(fen)(fen)區分(fen)(fen)布不均勻。有(you)的 task 執(zhi)(zhi)行時(shi)間(jian)長,有(you)的 task 執(zhi)(zhi)行時(shi)間(jian)短,數據(ju)(ju)量越大,數據(ju)(ju)傾斜越大。比(bi)如 1000 萬數據(ju)(ju),10個分(fen)(fen)區的時(shi)候,各分(fen)(fen)區的數據(ju)(ju)量:
[bding@vm95 kafka-logs]$ du -h ./ -d 1125M ./meters-8149M ./meters-7119M ./meters-9138M ./meters-4110M ./meters-3158M ./meters-6131M ./meters-5105M ./meters-0113M ./meters-299M ./meters-1
另一個影響多線程寫入速度的是數據的亂序程度。本測試場景中,多條時間線的數據隨機分配到了不同分區,當(dang)單線程寫(xie)入(ru)時(shi)(即 1 個分區(qu)時(shi)),數(shu)據是嚴(yan)格有(you)序(xu)的,寫(xie)入(ru)速(su)度最快。線程越(yue)多(duo)亂序(xu)程度越(yue)大。
所以在實際應用場景(jing)中,建議將同一(yi)個子表的數據,放在 Kafka 同一(yi)個分(fen)區中。
附錄
· 測試程序
本文中用到的所有代(dai)碼(ma)和原始測(ce)試結(jie)果數據都已上傳到 。
· 測試環境



























