小T導讀:天(tian)潤(run)融通是一家云呼叫(jiao)(jiao)中心(xin)服務商(shang),其中CTI-Cloud為大(da)量頭部客戶提供(gong)高效、穩定的呼叫(jiao)(jiao)中心(xin)服務。現在,天(tian)潤(run)通過T-Phone SDK將CTI-Cloud的功能(neng)延伸到移(yi)動(dong)端,為客戶提供(gong)移(yi)動(dong)端的呼叫(jiao)(jiao)服務。
應用場景
在(zai)天(tian)潤的(de)(de)T-Phone SDK中(zhong),我們需要采(cai)集(ji)WebRTC信息來(lai)進(jin)(jin)行數(shu)據的(de)(de)分(fen)(fen)析并作(zuo)出(chu)優(you)化的(de)(de)建議,所以(yi)(yi)需要將(jiang)SDK中(zhong)采(cai)集(ji)到的(de)(de)相關(guan)日(ri)志(zhi)進(jin)(jin)行上報(bao);為(wei)了(le)精簡日(ri)志(zhi)上報(bao)的(de)(de)數(shu)據,我們只針對其中(zhong)的(de)(de)傳輸數(shu)據每隔(ge)5秒(miao)在(zai)雙(shuang)方(fang)接(jie)通(tong)后(hou)上傳,針對傳輸中(zhong)網絡的(de)(de)抖動和鏈接(jie)狀態可(ke)以(yi)(yi)數(shu)據化展示,提供(gong)對每一通(tong)話(hua)的(de)(de)數(shu)據分(fen)(fen)析,以(yi)(yi)便在(zai)后(hou)續SDK演進(jin)(jin)中(zhong)提供(gong)數(shu)據支(zhi)撐;另外(wai)我們對每一通(tong)電話(hua)做(zuo)了(le)操(cao)作(zuo)日(ri)志(zhi),記(ji)(ji)錄了(le)接(jie)口被調(diao)用(yong)的(de)(de)操(cao)作(zuo)和時(shi)間,為(wei)用(yong)戶在(zai)某一通(tong)電話(hua)的(de)(de)操(cao)作(zuo)記(ji)(ji)錄做(zuo)還原,分(fen)(fen)析可(ke)能的(de)(de)誤操(cao)作(zuo)等,為(wei)客戶提供(gong)更好的(de)(de)交互體驗(yan)。
因為(wei)現在(zai)仍(reng)(reng)處于項目初期,我們更關心用(yong)戶在(zai)某一個時間段內(nei)的使用(yong)情況(kuang),在(zai)大量(liang)使用(yong)的場景中是否仍(reng)(reng)然能(neng)保(bao)證較高(gao)的通話質量(liang),同時我們應該盡可能(neng)做(zuo)到對每個座席都可以進行(xing)分析(xi),做(zuo)到每一個座席都應該有自己的數據(ju)表。
舉個例子:如果(guo)我們要查詢企業7000001的座席9001 2020年2月(yue)14日12:00-12:10分(fen)的一(yi)通(tong)(tong)通(tong)(tong)話的WebRTC日志,如果(guo)沒(mei)有按照座席進行分(fen)表,SQL語句應該是這樣:
select log_time, audio_bytes_sent,... from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;
如果要提升查詢的速度,我們首先要對device_id和log_time字段(duan)建(jian)立索(suo)引,但是(shi)(shi)當(dang)數(shu)據(ju)量(liang)比較(jiao)大的(de)時候,索(suo)引的(de)存儲也會是(shi)(shi)問題,所以要考(kao)慮分表(我們之前使用的(de)數(shu)據(ju)庫是(shi)(shi)aws的(de)rds,所以沒有分庫的(de)概念)
分表的選擇有兩種,按照時間分表或者按照座席分表。為什么我們要按照座席分表?如果按照時間分表,這樣就會出現不同表的數據量差異過大,甚至存在某個表里沒有數據的情況,因為很少有人半夜做外呼。但是我們也不能這樣武斷的不為半夜的時間段建立表,萬一人打的是國際長途呢?但是一個座席不可能存在不外呼的情況,而且對于移動端的應用,我們在排查問題時更多是通過某個座席向我們反饋發生的問題,我們再針對這個座席進行排查,所以在查詢的時候device_id這個字段是必須要體現的,如果按照device_id進行分(fen)表(biao)(biao),我們在查詢的(de)時候就不再需要對這個(ge)字段(duan)建立索引了。因而選擇(ze)按(an)照座(zuo)席(xi)進行分(fen)表(biao)(biao)。
如果要(yao)使(shi)用傳統的數(shu)據庫(ku)做分表(biao)(biao),我們在插(cha)(cha)入(ru)數(shu)據之前(qian)一定要(yao)先(xian)判(pan)斷這(zhe)張表(biao)(biao)是否存在,同時我們還需(xu)要(yao)提(ti)前(qian)創建(jian)好這(zhe)些(xie)表(biao)(biao)。這(zhe)種步驟在我看來就顯得很(hen)雞(ji)肋。如果能有數(shu)據庫(ku)可以做到(dao)在插(cha)(cha)入(ru)數(shu)據時指定表(biao)(biao)名,如果存在則(ze)插(cha)(cha)入(ru),如果不存在則(ze)自動創建(jian)表(biao)(biao),這(zhe)樣就方(fang)便多了。
日志上報的整體處理流程
整個流程需要T-Phone SDK,CTI-Cloud的Interface模塊(CTI-Cloud對客戶開放(fang)的接口)和日志上報模塊相互協作

設計
考慮到日志上報的(de)(de)頻率較(jiao)高,對IO吞吐的(de)(de)要求比(bi)較(jiao)高。我(wo)們(men)可以通過全異步(bu)的(de)(de)方式進(jin)行數據的(de)(de)采集(ji)。這次(ci)使(shi)用了作為全異步(bu)項目開發的(de)(de)工(gong)具。
在數據存儲上基于以下幾點考慮我們選擇了TDengine Database:
1. 不管是WebRTC日志還是操作日志,都是按照時間產生的數據流。而TDengine正好是一個專門為物聯網結構化數據流設計的時序數據庫。
2. WebRTC日志(zhi)和(he)操(cao)作日志(zhi)存儲的(de)數(shu)據(ju)(ju)(ju)格式都(dou)是(shi)一致的(de),但是(shi)如(ru)果要(yao)做(zuo)到(dao)都(dou)每(mei)個(ge)使用(yong)的(de)座席都(dou)可以進(jin)行分析,最好的(de)方式是(shi)每(mei)個(ge)座席都(dou)能(neng)有一張(zhang)自己的(de)數(shu)據(ju)(ju)(ju)表(biao)(biao)。TDengine提供了(le)(le)超級表(biao)(biao),在(zai)超級表(biao)(biao)中定(ding)(ding)義數(shu)據(ju)(ju)(ju)結構,并按(an)照tag區(qu)分,只要(yao)在(zai)插入數(shu)據(ju)(ju)(ju)時指定(ding)(ding)表(biao)(biao)名即可做(zuo)到(dao)分表(biao)(biao)。顯然解決了(le)(le)上(shang)述的(de)雞肋問題(ti)。按(an)照TDengine官網上(shang)的(de)介紹:
為充(chong)分利用其數(shu)(shu)據(ju)的時(shi)序性和其他數(shu)(shu)據(ju)特點(dian)(dian),TDengine要求(qiu)對每(mei)個數(shu)(shu)據(ju)采(cai)集點(dian)(dian)單獨建表(biao)。
其(qi)實我們(men)的(de)座(zuo)席就相當(dang)于(yu)是一個獨立的(de)數據采集點,TDengine在我們(men)的(de)場景中是很貼合業務的(de)。
3. 時(shi)間。時(shi)間也是(shi)(shi)我(wo)們(men)在查詢中重點關注(zhu)的(de)(de)部(bu)分(fen),在傳統的(de)(de)數據(ju)庫(ku)中,我(wo)們(men)需(xu)要通過對字段建(jian)立(li)索(suo)引(yin)(yin)來提(ti)升查詢速(su)度(du),可是(shi)(shi)我(wo)們(men)仍然不想建(jian)立(li)索(suo)引(yin)(yin),因為(wei)索(suo)引(yin)(yin)仍需(xu)要占用存(cun)儲空(kong)間,我(wo)們(men)是(shi)(shi)否可以通過類(lei)似分(fen)表的(de)(de)方式來取代索(suo)引(yin)(yin)呢?答案是(shi)(shi)肯定的(de)(de):
TDengine中寫入的(de)(de)(de)(de)數據(ju)在(zai)硬(ying)盤上是按時(shi)間(jian)維(wei)度進行(xing)分(fen)片的(de)(de)(de)(de)。同(tong)一(yi)(yi)個(ge)vnode中的(de)(de)(de)(de)表在(zai)同(tong)一(yi)(yi)時(shi)間(jian)范圍內的(de)(de)(de)(de)數據(ju)都存(cun)放在(zai)同(tong)一(yi)(yi)文(wen)件組中。這一(yi)(yi)數據(ju)分(fen)片方式可以大(da)大(da)簡化數據(ju)在(zai)時(shi)間(jian)維(wei)度的(de)(de)(de)(de)查詢,提高查詢速度。在(zai)默認(ren)配置下(xia),硬(ying)盤上的(de)(de)(de)(de)每個(ge)數據(ju)文(wen)件存(cun)放10天(tian)數據(ju)。用(yong)戶可根據(ju)需要(yao)修改系統配置參數daysPerFile進行(xing)個(ge)性化配置。
4. 插入和查詢的速度要快,穩(wen)定。
在我(wo)們的(de)開發服務(wu)器上(shang)嘗(chang)試了一下TDengine Database。和(he)官網(wang)上(shang)介(jie)紹的(de)出(chu)入不大,查詢和(he)存(cun)(cun)儲速(su)度確實很快,而且也不依賴(lai)其他文件(jian)系統,所(suo)以(yi)就(jiu)使用TDengine作為這個(ge)模塊(kuai)的(de)存(cun)(cun)儲引擎(qing)。由于TDengine中對列有(you)長度限制,最長4096,而且我(wo)們上(shang)報(bao)的(de)字段比(bi)較多,所(suo)以(yi)盡量分配好(hao)每個(ge)字段的(de)長度。
在數(shu)據(ju)的(de)采(cai)集過程中(zhong)(zhong),TPhone SDK不(bu)會(hui)直接和(he)我(wo)(wo)們(men)進行數(shu)據(ju)交(jiao)互,而是會(hui)先將(jiang)數(shu)據(ju)存儲到中(zhong)(zhong),我(wo)(wo)們(men)再從SQS中(zhong)(zhong)拉取數(shu)據(ju),然后對(dui)數(shu)據(ju)處理(li)后進行存儲。
先來創(chuang)建(jian)一(yi)個超級(ji)(ji)表(biao)(biao),tdengine提供(gong)的超級(ji)(ji)表(biao)(biao)在我(wo)看(kan)來還是很方(fang)便的,我(wo)們可以直接利用超級(ji)(ji)表(biao)(biao)來做到自動的對數據進行分表(biao)(biao)存儲(chu)。
create database aladdin;
use aladdin;
create table webrtc_log(
createTime timestamp,
deviceId binary(100),
audioBytesSent bigint,
audioBytesReceived bigint,
...
ssrcSendGoogCurrentDelayMs int,
ssrcSendGoogJitterBufferMs int
) tags (
deviceIdTag binary(100)
);
TDengine提供了非(fei)常多的(de)連接方(fang)式(shi)(shi),為了更好的(de)配合Vertx進(jin)行異步(bu)存儲(chu),我們(men)在這里使用了Rest方(fang)式(shi)(shi)進(jin)行數據庫(ku)操作。
開始
在有了整(zheng)體思路之后我們開(kai)始上手開(kai)發:
1. 應用配置:
{
"aws.region": "<your aws region>",
"aws.accessKey": "<your aws ak>",
"aws.secretAccessKey": "<your aws sk>",
"aladdin.maxPool": 100,
"aladdin.maxWaitQueue": 1500,
"aladdin.queue.name": ["queuename1","queuename2"],
"aladdin.cache.expireAfterWrite": 30,
"aladdin.cache.expireAfterAccess": 30,
"tdengine.host": "<your tdengine host>",
"tdengine.port": 6020,
"tdengine.user": "root",
"tdengine.password": "<your tdengine password>"
}
2. 重寫(xie)Launcher
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author qianwj
* @since v1.0
*/
public class AladdinLauncher extends Launcher {
private static Configurer configurer = new Configurer();
private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class);
public static void main(String[] args) {
System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
new AladdinLauncher().dispatch(args);
}
@Override
public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
logger.info("Loading config starting...");
JsonObject config = configurer.load();
JsonObject local = deploymentOptions.getConfig();
if (!config.isEmpty()) { // 將consul配置注入到context中
local.mergeIn(config);
deploymentOptions.setConfig(local);
}
super.beforeDeployingVerticle(deploymentOptions);
logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());
}
@Override
public void afterConfigParsed(JsonObject config) {
logger.info("Loading local config complete, local config: {}", config.encodePrettily());
}
@Override
public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);
vertx.close();
}
}
其實寫完(wan)第二步(bu)就(jiu)可以(yi)知道(dao)這個配(pei)置(zhi)(zhi)文(wen)件存在(zai)不是必要(yao)(yao)的,我們使用了Consul作為配(pei)置(zhi)(zhi)中心(xin)來進行(xing)集中配(pei)置(zhi)(zhi),這一步(bu)主要(yao)(yao)是為了注入consul的配(pei)置(zhi)(zhi)以(yi)及加載日志。
3. 拉(la)取SQS中的數(shu)據
import com.amazonaws.AmazonServiceException;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.service.AwsSQSService;
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class DataCollectVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class);
private volatile boolean shutdown = false;
@Override
public void start() throws Exception {
logger.info("DataCollectVerticle starting...");
AwsSQSService sqsService = Configurer.sqsService();
EventBus bus = vertx.eventBus();
vertx.setPeriodic(1000, id -> {
try {
if (shutdown) {
vertx.cancelTimer(id);
}
JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);
List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0));
List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));
bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));
} catch (AmazonServiceException e) {
logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e);
}
});
}
@Override
public void stop() throws Exception {
shutdown = true;
logger.info("DataCollectVerticle closing...");
}
}
4. 將數(shu)據(ju)存儲到(dao)TDengine中
import com.github.benmanes.caffeine.cache.Cache;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.DataOperator;
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.model.WebRTCLog;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SaveVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(SaveVerticle.class);
@Override
public void start() throws Exception {
logger.info("SaveVerticle starting....");
// 從event bus接收數據
EventBus bus = vertx.eventBus();
bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> {
JsonArray coming = new JsonArray(msg.body());
if (coming != null)
save(coming);
});
}
private void save(JsonArray array) {
WebClient client = Configurer.tdClient();
List<WebRTCLog> data = new ArrayList<>();
Cache<String, WebRTCLog> cache = Configurer.cache();
if (array.size() > 0) {
final WebRTCLog empty = new WebRTCLog();
for (int i = 0; i < array.size(); i++) {
String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();
try {
JsonObject json = DataOperator.toJsonObject(message);
WebRTCLog log = json.mapTo(WebRTCLog.class);
String cacheKey = log.getDeviceId();
WebRTCLog org = cache.get(cacheKey, k -> empty);
if (!Objects.equals(org, empty)) { // 如果不是第一次插入
DataOperator.merge(log, org);
}
cache.put(cacheKey, log);
data.add(log);
} catch (Exception e) {
logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);
}
}
client.post("/rest/sql")
.basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))
.sendBuffer(insert(data), ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
if (response != null) {
JsonObject res = response.bodyAsJsonObject();
if (!"succ".equals(res.getString("status"))) {
logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc"));
}
}
} else {
logger.error("data insert failed! {}", Json.encode(data), ar.cause());
}
});
}
}
private Buffer insert(WebRTCLog log) throws Exception {
String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +
" USING ALADDIN.WEBRTC_LOG TAGS(%s) " +
"VALUES(%s)";
String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));
return Buffer.buffer(sql);
}
private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";
for (WebRTCLog log : data) {
sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));
}
return Buffer.buffer(sqlBuilder.toString());
}
@Override
public void stop() throws Exception {
logger.info("SaveVerticle closing....");
}
}
5. 部署Verticle
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.verticle.DataCollectVerticle;
import com.tinet.twatch.aladdin.verticle.SaveVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClientOptions;
public class MainVerticle extends AbstractVerticle {
private Logger logger = LoggerFactory.getLogger(MainVerticle.class);
@Override
public void start(Promise<Void> startPromise) throws Exception {
logger.info("MainVerticle starting...");
// 初始化sqs
String region = config().getString("aws.region");
String accessKey = config().getString("aws.accessKey");
String secretKey = config().getString("aws.secretAccessKey");
Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));
DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();
dataCollectDeploymentOptions.setInstances(1);
dataCollectDeploymentOptions.setConfig(config());
dataCollectDeploymentOptions.setWorker(true);
Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));
vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("DataCollectVerticle started!");
} else {
logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause());
}
});
// 初始化webclient
WebClientOptions options = new WebClientOptions();
options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));
options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));
options.setDefaultHost(config().getString("tdengine.host"));
options.setDefaultPort(config().getInteger("tdengine.port"));
Configurer.initTDClient(vertx, options);
DeploymentOptions saveDeploymentOptions = new DeploymentOptions();
saveDeploymentOptions.setInstances(1);
saveDeploymentOptions.setConfig(config());
vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {
if (ar.succeeded()) {
logger.info("SaveVerticle started!");
} else {
logger.warn("SaveVerticle deploy failed!");
}
});
}
}
這樣就快速實(shi)現(xian)了一(yi)個(ge)日志上報的模塊,且(qie)多個(ge)實(shi)例部署時相(xiang)互之間不會產生影響,當然在實(shi)際的生產環境中(zhong),我們(men)需要(yao)考(kao)慮的會更多。
當然,日(ri)志上報只是開始。在(zai)(zai)之后的項目開發中,我還會繼(ji)續(xu)向大(da)家(jia)介紹TDengine Database在(zai)(zai)數據分析中的應用實踐,感謝觀看(kan)。
作者簡介:錢(qian)文錦(jin) ,天潤融通基礎(chu)研發(fa)部研發(fa)工程師(shi),開源社(she)區愛好者,目前(qian)主要負責天潤融通T-Phone SDK/CTI-Cloud相關(guan)功(gong)能開發(fa)和應用。
本文首發于:


























