无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

如何同步 Kafka 的數據到 TDengine? 性能如何?

小 T 導讀: 在(zai) TDengine 的(de)官方文(wen)(wen)檔上放出來(lai)已經有(you)一段時間了(le),我(wo)們(men)也收到了(le)一些開發者的(de)反饋。文(wen)(wen)檔中的(de)教程使用(yong) Confluent 平(ping)臺(集成了(le) Kafka)演示了(le)如何(he)使用(yong) Source Connector 和 Sink Connector,但是很多(duo)開發者在(zai)生產環境中并沒有(you)使用(yong) Confluent,所以為方便大家,本文(wen)(wen)將(jiang)使用(yong)獨立部署(shu)的(de) Kafka 來(lai)演示。

本文包含以下內容:

  1. 如何使用 TDengine Sink Connector, 把數據從 Kafka 同步到 TDengine。
  2. TDengine Sink Connector 的實現原理。
  3. 一個簡單的測試腳本,幫助你在自己的環境中快速測試。通過更改生成測試數據的程序和配置參數,你可以模擬自己的使用場景。
  4. 測試同步同一個 topic,使用不同分區數和不同 Sink 任務數對性能的影響。

背景知識

如果你(ni)對(dui)文章(zhang)開(kai)頭(tou)出(chu)現(xian)的術語并不陌生,那(nei)么可以跳過這一(yi)部分。

· 什么是 Kafka?

Kafka 的核心(xin)是(shi)一個(ge)通用的、分布式的、可重復消費(fei)的消息隊列。

與之相比,作為一款時序數據庫(Time Series Database),TDengine 也可看(kan)作針(zhen)對(dui)結構化的(de)時序數據的(de)消息隊列。

· 什么是 Kafka Connect? 為什么使用 Kafka Connect?


Kafka Connect 是 Kafka 的一個組件,簡化了 Kafka 與其它數據(ju)(ju)源的集成。用戶(hu)通過 Kafka Connect 讀寫 Kafka;通過 Kafka Connect 插件(也稱 Kafka Connector)來(lai)讀寫各(ge)種數據(ju)(ju)源。

為(wei)方便(bian)集成(cheng),Kafka 已經提供了生產(chan)者(zhe)和消費者(zhe) API 以及(ji)客(ke)戶端庫(ku),那為(wei)什么還(huan)需要(yao) Kafka Connect 呢?因為(wei)一個好的 Kafka 客(ke)戶端程序,不(bu)是單單生產(chan)或(huo)消費數據,還(huan)需要(yao)考(kao)慮(lv)容錯、重啟(qi)、日志、彈性伸縮、序列化(hua)以及(ji)反序列化(hua)等(deng)。當(dang)開(kai)發(fa)者(zhe)自己完成(cheng)了這一切,就相當(dang)于開(kai)發(fa)了一個和 Kafka Connect 類似的東西。

與 Kafka 集(ji)成是 Kafka Connect 已經解決的問題,用戶不需(xu)(xu)要重復造(zao)輪子(zi),只有少數邊緣場景才需(xu)(xu)要定制化的集(ji)成方(fang)案。

TDengine Sink Connector 的實現原理

TDengine Sink Connector 用(yong)于將(jiang) Kafka 中指定 topic 的數據(批量或(huo)實時)同步到 TDengine 的 database 中。

啟動 Sink Connector 需要一個 properties 配置文件。詳細(xi)配置見(jian)官方(fang)文檔的。

Sink Connector 內部的實現非常(chang)簡單,整(zheng)體(ti)工作流程分為以下幾(ji)個步驟:

  1. Connect 框架根據配置啟動 N 個消費者線程。
  2. N 個消費者同時訂閱數據,并用配置文件中指定的 key.converter 和 value.converter 做反序列化。
  3. Connect 框架把反序列化后的數據傳遞給 N 個 SinkTask 的實例。
  4. SinkTask 使用 TDengine 提供的 schemaless 寫入接口來寫入數據。

上(shang)述(shu) 4 個步驟,只有最(zui)后一步寫數據(ju)是(shi) Sink Connector 需要關心的,其(qi)它都(dou)是(shi) Connect 框架自動實現的。

下面重點討論幾個問(wen)題。

· 支持的數據格式

因為(wei)使用了 schemaless 寫入接口,因此 TDengine Sink Connector 只支持三種格(ge)式的(de)數(shu)據(ju)(ju):、  和 。使用配置項 db.schemaless 來指定寫入時使用的(de)數(shu)據(ju)(ju)格(ge)式。例如:

db.schemaless=line

如果 Kafka 中的(de)(de)數據(ju)已(yi)經是這三種格式之一,那么配置文件中的(de)(de) value.converer,只(zhi)需指(zhi)定為 Connnect 內(nei)置的(de)(de) org.apache.kafka.connect.storage.StringConverter。

value.converter=org.apache.kafka.connect.storage.StringConverter

如果 Kafka 中已有(you)的數據不(bu)是上述(shu)三種(zhong)之一(yi),則需要(yao)實現自己的 Converter 類, 將其轉換為三種(zhong)格(ge)式之一(yi),也許能(neng)幫到你。

· 如何指定 Consumer 的參數?

既然 Connect 框架(jia)已經幫我們(men)做了 Consumer 要做的(de)事,那么(me)我們(men)怎么(me)來控(kong)(kong)制(zhi) Consumer 的(de)行為呢?比(bi)如如何(he)(he)控(kong)(kong)制(zhi) Consumer 訂閱(yue)的(de)主題?如何(he)(he)控(kong)(kong)制(zhi) Consumer 每次(ci) poll 的(de)消(xiao)息數和時間(jian)間(jian)隔?

對于訂閱(yue)哪些主題,可以用配置(zhi)項 topics 來指定(ding)。

如果想(xiang)(xiang)覆蓋(gai) Consumer 的(de)其它默認配(pei)置(zhi),可以直接(jie)在(zai) Sink Connector 的(de)配(pei)置(zhi)文(wen)件中編寫,但是要加前綴 “consumer.override.”,比(bi)如想(xiang)(xiang)把每次(ci) poll 的(de)最大消(xiao)息數(shu)改為(wei) 3000, 可以這樣配(pei)置(zhi):

consumer.override.max.poll.records=3000

· 如何控制寫入線程數?

對于(yu) Kafka Connect Sink,task 本質上就(jiu)是消費者線程(cheng),接收從(cong) topic 的(de)(de)分區(qu)(qu)讀出來的(de)(de)數(shu)(shu)(shu)(shu)(shu)(shu)據。用配置(zhi)參數(shu)(shu)(shu)(shu)(shu)(shu) tasks.max 來控制最大(da)任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu),一個(ge)(ge)(ge)任(ren)務(wu)一個(ge)(ge)(ge)線程(cheng)。實際(ji)啟(qi)動(dong)的(de)(de)任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu)還與(yu)(yu) topic 的(de)(de)分區(qu)(qu)數(shu)(shu)(shu)(shu)(shu)(shu)有(you)關。如(ru)(ru)果(guo)你有(you) 10 個(ge)(ge)(ge)分區(qu)(qu),并(bing)且 tasks.max 設置(zhi)為(wei) 5, 那么每(mei)個(ge)(ge)(ge) task 會(hui)收到(dao) 2 個(ge)(ge)(ge)分區(qu)(qu)的(de)(de)數(shu)(shu)(shu)(shu)(shu)(shu)據,并(bing)跟蹤(zong) 2 個(ge)(ge)(ge)分區(qu)(qu)的(de)(de) offsets。如(ru)(ru)果(guo)你配置(zhi)的(de)(de) tasks.max 比 partition 數(shu)(shu)(shu)(shu)(shu)(shu)大(da), Connect 會(hui)啟(qi)動(dong)的(de)(de) task 數(shu)(shu)(shu)(shu)(shu)(shu)與(yu)(yu) topic 的(de)(de) partition 數(shu)(shu)(shu)(shu)(shu)(shu)相同。如(ru)(ru)果(guo)你訂閱了 5 個(ge)(ge)(ge) topic,每(mei)個(ge)(ge)(ge) topic 都(dou)是 1 個(ge)(ge)(ge)分區(qu)(qu), 并(bing)且設置(zhi) tasks.max = 5, 那么實際(ji)會(hui)啟(qi)動(dong)多少個(ge)(ge)(ge)任(ren)務(wu)呢?答(da)案是 1 個(ge)(ge)(ge), 任(ren)務(wu)數(shu)(shu)(shu)(shu)(shu)(shu)與(yu)(yu) topic 數(shu)(shu)(shu)(shu)(shu)(shu)量沒有(you)關系(xi)。

TDengine Sink Connector 使用示例

這(zhe)一(yi)部分(fen)我們在(zai)一(yi)臺 Linux 服務器(qi)上(shang)搭(da)建測試環(huan)境,并(bing)運行簡單(dan)的(de)示(shi)例(li)程序。示(shi)例(li)中將(jiang) Kafka 部署(shu)到了個人的(de) home 目錄(lu)。操作(zuo)時請注意(yi)把(ba)路徑中的(de)用(yong)戶(hu)名(bding)替換為自己的(de)用(yong)戶(hu)名。

· 環境準備

  1. Java 1.8
  2. Maven
  3. 了 TDengine 相關服務進程:taosd 和 taosAdapter。

第一步:安裝 Kafka

wget //dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz

編輯 .bash_profile, 加入:

export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0
export PATH=$PATH:$KAFKA_HOME/bin
source .bash_profile

第二步:配置 Kafka

配置 Kafka Connect 加載插件的路徑。

cd kafka_2.13-3.2.0/config/
vi connect-standalone.properties

追加

plugin.path=/home/bding/connectors

修(xiu)改 Connector 插(cha)件的(de)日(ri)志級別。這一步(bu)非常重要,我們將(jiang)通(tong)過插(cha)件的(de)日(ri)志統計同步(bu)數據花費的(de)時間(jian)。

vi connect-log4j.properties

追加

log4j.logger.com.taosdata.kafka.connect.sink=DEBUG

第三步:編譯并安裝插件

git clone git@github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip

第四步:啟動 ZooKeeper Server 和 Kafka Server

zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

第五步:創建 topic

kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092

第六步:生成測試數據

將下列腳(jiao)本(ben)保(bao)存為 gen-data.py:

#!/usr/bin/python3

import random
import sys

topic = sys.argv[1]
count = int(sys.argv[2])

start_ts = 1648432611249000000
location = ["SanFrancisco", "LosAngeles", "SanDiego"]
for i in range(count):
    ts = start_ts + i
    row = f"{topic},location={location[i % 3]},groupid=2 current={random.random() * 10},voltage={random.randint(100, 300)},phase={random.random()} {ts}"
    print(row)

然后執行:

python3 gen-data.py meters 10000  | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

生(sheng)成 10000 條(tiao) InfluxDB 行(xing)協議格式的數(shu)據到 topic meters。每條(tiao)數(shu)據又包含 2 個(ge)標簽字段和 3 個(ge)數(shu)據字段。

第七步:啟動 Kafka Connect

將下列配置保存為 sink-test.properties。

name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

然后執行:

connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties

第八步:檢查 TDengine 中的數據

使用 TDengine CLI 查詢 power 數(shu)據(ju)庫 meters 表(biao),檢查是否正好包含 10000 條數(shu)據(ju)。

[bding@vm95 test]$ taos

Welcome to the TDengine shell from Linux, Client Version:2.6.0.4
Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.

taos> select count(*) from power.meters;
       count(*)        |
========================
                 10000 |

TDengine Sink Connector 性能測試

· 測試流程

這一(yi)部分,我們將上面示例步驟中的(de)(de)第四步到第七步封(feng)裝成可重復運行的(de)(de) shell 腳本,并做以下修改:

  1. 將 topic 的分區數作為腳本的第 1 個參數, 同時配置 tasks.max,使其等于分區數。這樣我們可以控制每次測試使用的寫入線程數。
  2. 將生成測試數據的條數作為腳本的第 2 個參數,用來控制每次測試同步的數據量。
  3. 啟動測試前清空所有數據,測試結束后停止 Connect、Kafka 和 ZooKeeper。

每次(ci)測試都先寫數(shu)據到 Kafka,然后(hou)再啟動 Connect 同(tong)(tong)步數(shu)據到 TDengine,這(zhe)(zhe)樣做可以把同(tong)(tong)步數(shu)據的(de)壓(ya)力全部集中到 Sink 插件這(zhe)(zhe)邊。我們統(tong)計 Sink Connector 從(cong)接(jie)收(shou)到第一批(pi)數(shu)據到接(jie)收(shou)到最后(hou)一批(pi)數(shu)據之間的(de)時間,作為同(tong)(tong)步數(shu)據的(de)總耗時。

完整腳本如下:

#!/bin/bash
if [ $# -lt 2 ];then
        echo  "Usage: ./run-test.sh <num_of_partitions>  <total_records>"
        exit 0
fi
echo "---------------------------TEST STARTED---------------------------------------"
echo clean data and logs
taos -s "DROP DATABASE IF EXISTS power"
rm -rf /tmp/kafka-logs /tmp/zookeeper
rm -f $KAFKA_HOME/logs/connect.log

np=$1     # number of partitions
total=$2  # number of records
echo number of partitions is $np, number of recordes is $total.

echo start zookeeper
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
echo start kafka
sleep 3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
sleep 5
echo create topic
kafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092

echo generate test data
python3 gen-data.py meters $total  | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

echo alter connector configuration setting tasks.max=$np
sed -i  "s/tasks.max=.*/tasks.max=${np}/"  sink-test.properties

echo start kafka connect
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties

echo -e "\e[1;31m open another console to monitor connect.log. press enter when no more data received.\e[0m"
read

echo stop connect
jps | grep ConnectStandalone | awk '{print $1}' | xargs kill
echo stop kafka server
kafka-server-stop.sh
echo stop zookeeper
zookeeper-server-stop.sh

# extract timestamps of receiving the first batch of data and the last batch of data
grep "records" $KAFKA_HOME/logs/connect.log  | grep meters- > tmp.log

start_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | head -1`
stop_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | tail -1`


echo "--------------------------TEST FINISHED------------------------------------"
echo "| records | partitions | start time | stop time |"
echo "|---------|------------|------------|-----------|"
echo "| $total | $np | $start_time | $stop_time |"

如果要測試使用(yong) 1 個分(fen)區(qu),共 100 萬條數據的(de)性(xing)能,可以這(zhe)樣執行(xing):

./run-test.sh 1 1000000

執行過(guo)程的截(jie)圖如下:

TDengine Database

注意中間有一個交互過程。因為腳本無法確定數(shu)據是(shi)否(fou)同步完,需(xu)要用(yong)戶監控(kong) connect.log 來確定是(shi)否(fou)已經消費完了所有數(shu)據,例(li)如:

[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/
[bding@vm95 logs]$ tail -f connect.log
[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)

當日(ri)志不再滾(gun)動,就說明(ming)已經消費完了(le)

· 測試結果

寫入速度與數據量和線程數的關系表

TDengine Database

上(shang)表第 1 列為總數(shu)(shu)據(ju)量,第 1 行為消費(fei)者線(xian)程數(shu)(shu),也是寫(xie)入線(xian)程數(shu)(shu)。中間(jian)為平均(jun)每秒寫(xie)入記錄數(shu)(shu)。

寫入速度與數據量和線程數的關系圖

TDengine Database

結果分析

從上(shang)圖可以看出,相同(tong)數據量,線(xian)程(cheng)越多寫(xie)(xie)(xie)入(ru)速度(du)越快。當(dang)使(shi)用單線(xian)程(cheng)寫(xie)(xie)(xie)入(ru)時,每(mei)秒(miao)能寫(xie)(xie)(xie)入(ru)大(da)概 10 萬以上(shang)。當(dang)使(shi)用 5 個(ge)線(xian)程(cheng)寫(xie)(xie)(xie)入(ru)時,每(mei)秒(miao)寫(xie)(xie)(xie)入(ru)大(da)概 35 萬左(zuo)右(you)。當(dang)使(shi)用10 個(ge)線(xian)程(cheng)時,每(mei)秒(miao)能寫(xie)(xie)(xie)入(ru)55 萬左(zuo)右(you)。

寫入速度比較(jiao)平穩,與總數據量關系不(bu)大。

同時(shi)也發現線(xian)程(cheng)增(zeng)加越多,線(xian)程(cheng)增(zeng)加帶來的速度提升越少。線(xian)程(cheng)數從(cong)(cong) 1 變(bian)到 10,速度只從(cong)(cong) 10 萬變(bian)到 50 萬。可能的原(yuan)因是數據(ju)(ju)在各個分(fen)(fen)區分(fen)(fen)布不均勻。有(you)的 task 執(zhi)(zhi)行時(shi)間(jian)長,有(you)的 task 執(zhi)(zhi)行時(shi)間(jian)短,數據(ju)(ju)量越大,數據(ju)(ju)傾斜越大。比(bi)如 1000 萬數據(ju)(ju),10個分(fen)(fen)區的時(shi)候,各分(fen)(fen)區的數據(ju)(ju)量:

[bding@vm95 kafka-logs]$ du -h ./ -d 1
125M    ./meters-8
149M    ./meters-7
119M    ./meters-9
138M    ./meters-4
110M    ./meters-3
158M    ./meters-6
131M    ./meters-5
105M    ./meters-0
113M    ./meters-2
99M     ./meters-1

另一個影響多線程寫入速度的是數據的亂序程度。本測試場景中,多條時間線的數據隨機分配到了不同分區,當(dang)單線程寫(xie)入(ru)時(shi)(即 1 個分區(qu)時(shi)),數(shu)據是嚴(yan)格有(you)序(xu)的,寫(xie)入(ru)速(su)度最快。線程越(yue)多(duo)亂序(xu)程度越(yue)大。

所以在實際應用場景(jing)中,建議將同一(yi)個子表的數據,放在 Kafka 同一(yi)個分(fen)區中。

附錄

· 測試程序

本文中用到的所有代(dai)碼(ma)和原始測(ce)試結(jie)果數據都已上傳到 。

· 測試環境

TDengine Database