无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

TDengine在WebRTC日志上報中的實踐

天潤融通 錢文錦

2020-02-21 / ,

小T導讀:天(tian)潤(run)融通是一家云呼叫(jiao)(jiao)中心(xin)服務商(shang),其中CTI-Cloud為大(da)量頭部客戶提供(gong)高效、穩定的呼叫(jiao)(jiao)中心(xin)服務。現在,天(tian)潤(run)通過T-Phone SDK將CTI-Cloud的功能(neng)延伸到移(yi)動(dong)端,為客戶提供(gong)移(yi)動(dong)端的呼叫(jiao)(jiao)服務。

應用場景

在(zai)天(tian)潤的(de)(de)T-Phone SDK中(zhong),我們需要采(cai)集(ji)WebRTC信息來(lai)進(jin)(jin)行數(shu)據的(de)(de)分(fen)(fen)析并作(zuo)出(chu)優(you)化的(de)(de)建議,所以(yi)(yi)需要將(jiang)SDK中(zhong)采(cai)集(ji)到的(de)(de)相關(guan)日(ri)志(zhi)進(jin)(jin)行上報(bao);為(wei)了(le)精簡日(ri)志(zhi)上報(bao)的(de)(de)數(shu)據,我們只針對其中(zhong)的(de)(de)傳輸數(shu)據每隔(ge)5秒(miao)在(zai)雙(shuang)方(fang)接(jie)通(tong)后(hou)上傳,針對傳輸中(zhong)網絡的(de)(de)抖動和鏈接(jie)狀態可(ke)以(yi)(yi)數(shu)據化展示,提供(gong)對每一通(tong)話(hua)的(de)(de)數(shu)據分(fen)(fen)析,以(yi)(yi)便在(zai)后(hou)續SDK演進(jin)(jin)中(zhong)提供(gong)數(shu)據支(zhi)撐;另外(wai)我們對每一通(tong)電話(hua)做(zuo)了(le)操(cao)作(zuo)日(ri)志(zhi),記(ji)(ji)錄了(le)接(jie)口被調(diao)用(yong)的(de)(de)操(cao)作(zuo)和時(shi)間,為(wei)用(yong)戶在(zai)某一通(tong)電話(hua)的(de)(de)操(cao)作(zuo)記(ji)(ji)錄做(zuo)還原,分(fen)(fen)析可(ke)能的(de)(de)誤操(cao)作(zuo)等,為(wei)客戶提供(gong)更好的(de)(de)交互體驗(yan)。

因為(wei)現在(zai)仍(reng)(reng)處于項目初期,我們更關心用(yong)戶在(zai)某一個時間段內(nei)的使用(yong)情況(kuang),在(zai)大量(liang)使用(yong)的場景中是否仍(reng)(reng)然能(neng)保(bao)證較高(gao)的通話質量(liang),同時我們應該盡可能(neng)做(zuo)到對每個座席都可以進行(xing)分析(xi),做(zuo)到每一個座席都應該有自己的數據(ju)表。

舉個例子:如果(guo)我們要查詢企業7000001的座席9001 2020年2月(yue)14日12:00-12:10分(fen)的一(yi)通(tong)(tong)通(tong)(tong)話的WebRTC日志,如果(guo)沒(mei)有按照座席進行分(fen)表,SQL語句應該是這樣:

select log_time, audio_bytes_sent,...  from aladdin.webrtc_log where device_id = '70000019001' and where log_time between 1581652800000 and 1581653400000;

如果要提升查詢的速度,我們首先要對device_idlog_time字段(duan)建(jian)立索(suo)引,但是(shi)(shi)當(dang)數(shu)據(ju)量(liang)比較(jiao)大的(de)時候,索(suo)引的(de)存儲也會是(shi)(shi)問題,所以要考(kao)慮分表(我們之前使用的(de)數(shu)據(ju)庫是(shi)(shi)aws的(de)rds,所以沒有分庫的(de)概念)

分表的選擇有兩種,按照時間分表或者按照座席分表。為什么我們要按照座席分表?如果按照時間分表,這樣就會出現不同表的數據量差異過大,甚至存在某個表里沒有數據的情況,因為很少有人半夜做外呼。但是我們也不能這樣武斷的不為半夜的時間段建立表,萬一人打的是國際長途呢?但是一個座席不可能存在不外呼的情況,而且對于移動端的應用,我們在排查問題時更多是通過某個座席向我們反饋發生的問題,我們再針對這個座席進行排查,所以在查詢的時候device_id這個字段是必須要體現的,如果按照device_id進行分(fen)表(biao)(biao),我們在查詢的(de)時候就不再需要對這個(ge)字段(duan)建立索引了。因而選擇(ze)按(an)照座(zuo)席(xi)進行分(fen)表(biao)(biao)。

如果要(yao)使(shi)用傳統的數(shu)據庫(ku)做分表(biao)(biao),我們在插(cha)(cha)入(ru)數(shu)據之前(qian)一定要(yao)先(xian)判(pan)斷這(zhe)張表(biao)(biao)是否存在,同時我們還需(xu)要(yao)提(ti)前(qian)創建(jian)好這(zhe)些(xie)表(biao)(biao)。這(zhe)種步驟在我看來就顯得很(hen)雞(ji)肋。如果能有數(shu)據庫(ku)可以做到(dao)在插(cha)(cha)入(ru)數(shu)據時指定表(biao)(biao)名,如果存在則(ze)插(cha)(cha)入(ru),如果不存在則(ze)自動創建(jian)表(biao)(biao),這(zhe)樣就方(fang)便多了。

日志上報的整體處理流程

整個流程需要T-Phone SDK,CTI-Cloud的Interface模塊(CTI-Cloud對客戶開放(fang)的接口)和日志上報模塊相互協作

TDengine在WebRTC日志上報中的實踐 - TDengine Database 時序數據庫

設計

考慮到日志上報的(de)(de)頻率較(jiao)高,對IO吞吐的(de)(de)要求比(bi)較(jiao)高。我(wo)們(men)可以通過全異步(bu)的(de)(de)方式進(jin)行數據的(de)(de)采集(ji)。這次(ci)使(shi)用了作為全異步(bu)項目開發的(de)(de)工(gong)具。

在數據存儲上基于以下幾點考慮我們選擇了TDengine Database

1. 不管是WebRTC日志還是操作日志,都是按照時間產生的數據流。而TDengine正好是一個專門為物聯網結構化數據流設計的時序數據庫

2. WebRTC日志(zhi)和(he)操(cao)作日志(zhi)存儲的(de)數(shu)據(ju)(ju)(ju)格式都(dou)是(shi)一致的(de),但是(shi)如(ru)果要(yao)做(zuo)到(dao)都(dou)每(mei)個(ge)使用(yong)的(de)座席都(dou)可以進(jin)行分析,最好的(de)方式是(shi)每(mei)個(ge)座席都(dou)能(neng)有一張(zhang)自己的(de)數(shu)據(ju)(ju)(ju)表(biao)(biao)。TDengine提供了(le)(le)超級表(biao)(biao),在(zai)超級表(biao)(biao)中定(ding)(ding)義數(shu)據(ju)(ju)(ju)結構,并按(an)照tag區(qu)分,只要(yao)在(zai)插入數(shu)據(ju)(ju)(ju)時指定(ding)(ding)表(biao)(biao)名即可做(zuo)到(dao)分表(biao)(biao)。顯然解決了(le)(le)上(shang)述的(de)雞肋問題(ti)。按(an)照TDengine官網上(shang)的(de)介紹:

為充(chong)分利用其數(shu)(shu)據(ju)的時(shi)序性和其他數(shu)(shu)據(ju)特點(dian)(dian),TDengine要求(qiu)對每(mei)個數(shu)(shu)據(ju)采(cai)集點(dian)(dian)單獨建表(biao)。

其(qi)實我們(men)的(de)座(zuo)席就相當(dang)于(yu)是一個獨立的(de)數據采集點,TDengine在我們(men)的(de)場景中是很貼合業務的(de)。

3. 時(shi)間。時(shi)間也是(shi)(shi)我(wo)們(men)在查詢中重點關注(zhu)的(de)(de)部(bu)分(fen),在傳統的(de)(de)數據(ju)庫(ku)中,我(wo)們(men)需(xu)要通過對字段建(jian)立(li)索(suo)引(yin)(yin)來提(ti)升查詢速(su)度(du),可是(shi)(shi)我(wo)們(men)仍然不想建(jian)立(li)索(suo)引(yin)(yin),因為(wei)索(suo)引(yin)(yin)仍需(xu)要占用存(cun)儲空(kong)間,我(wo)們(men)是(shi)(shi)否可以通過類(lei)似分(fen)表的(de)(de)方式來取代索(suo)引(yin)(yin)呢?答案是(shi)(shi)肯定的(de)(de):

TDengine中寫入的(de)(de)(de)(de)數據(ju)在(zai)硬(ying)盤上是按時(shi)間(jian)維(wei)度進行(xing)分(fen)片的(de)(de)(de)(de)。同(tong)一(yi)(yi)個(ge)vnode中的(de)(de)(de)(de)表在(zai)同(tong)一(yi)(yi)時(shi)間(jian)范圍內的(de)(de)(de)(de)數據(ju)都存(cun)放在(zai)同(tong)一(yi)(yi)文(wen)件組中。這一(yi)(yi)數據(ju)分(fen)片方式可以大(da)大(da)簡化數據(ju)在(zai)時(shi)間(jian)維(wei)度的(de)(de)(de)(de)查詢,提高查詢速度。在(zai)默認(ren)配置下(xia),硬(ying)盤上的(de)(de)(de)(de)每個(ge)數據(ju)文(wen)件存(cun)放10天(tian)數據(ju)。用(yong)戶可根據(ju)需要(yao)修改系統配置參數daysPerFile進行(xing)個(ge)性化配置。

4. 插入和查詢的速度要快,穩(wen)定。

在我(wo)們的(de)開發服務(wu)器上(shang)嘗(chang)試了一下TDengine Database。和(he)官網(wang)上(shang)介(jie)紹的(de)出(chu)入不大,查詢和(he)存(cun)(cun)儲速(su)度確實很快,而且也不依賴(lai)其他文件(jian)系統,所(suo)以(yi)就(jiu)使用TDengine作為這個(ge)模塊(kuai)的(de)存(cun)(cun)儲引擎(qing)。由于TDengine中對列有(you)長度限制,最長4096,而且我(wo)們上(shang)報(bao)的(de)字段比(bi)較多,所(suo)以(yi)盡量分配好(hao)每個(ge)字段的(de)長度。

在數(shu)據(ju)的(de)采(cai)集過程中(zhong)(zhong),TPhone SDK不(bu)會(hui)直接和(he)我(wo)(wo)們(men)進行數(shu)據(ju)交(jiao)互,而是會(hui)先將(jiang)數(shu)據(ju)存儲到中(zhong)(zhong),我(wo)(wo)們(men)再從SQS中(zhong)(zhong)拉取數(shu)據(ju),然后對(dui)數(shu)據(ju)處理(li)后進行存儲。

先來創(chuang)建(jian)一(yi)個超級(ji)(ji)表(biao)(biao),tdengine提供(gong)的超級(ji)(ji)表(biao)(biao)在我(wo)看(kan)來還是很方(fang)便的,我(wo)們可以直接利用超級(ji)(ji)表(biao)(biao)來做到自動的對數據進行分表(biao)(biao)存儲(chu)。

create database aladdin;

use aladdin;

create table webrtc_log(
 createTime timestamp,
 deviceId binary(100),
 audioBytesSent bigint,
 audioBytesReceived bigint,
 ...
 ssrcSendGoogCurrentDelayMs int,
 ssrcSendGoogJitterBufferMs int
) tags (
  deviceIdTag binary(100)
);

TDengine提供了非(fei)常多的(de)連接方(fang)式(shi)(shi),為了更好的(de)配合Vertx進(jin)行異步(bu)存儲(chu),我們(men)在這里使用了Rest方(fang)式(shi)(shi)進(jin)行數據庫(ku)操作。

開始

在有了整(zheng)體思路之后我們開(kai)始上手開(kai)發:

1. 應用配置:

{
  "aws.region": "<your aws region>",
  "aws.accessKey": "<your aws ak>",
  "aws.secretAccessKey": "<your aws sk>",
  "aladdin.maxPool": 100,
  "aladdin.maxWaitQueue": 1500,
  "aladdin.queue.name": ["queuename1","queuename2"],
  "aladdin.cache.expireAfterWrite": 30,
  "aladdin.cache.expireAfterAccess": 30,
  "tdengine.host": "<your tdengine host>",
  "tdengine.port": 6020,
  "tdengine.user": "root",
  "tdengine.password": "<your tdengine password>"
}

2. 重寫(xie)Launcher

import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author qianwj
 * @since v1.0
 */
public class AladdinLauncher extends Launcher {

  private static Configurer configurer = new Configurer();

  private Logger logger = LoggerFactory.getLogger(AladdinLauncher.class);

  public static void main(String[] args) {
    System.setProperty("vertx.logger-delegate-factory-class-name", SLF4JLogDelegateFactory.class.getName());
    new AladdinLauncher().dispatch(args);
  }

  @Override
  public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
    logger.info("Loading config starting...");
    JsonObject config = configurer.load();
    JsonObject local = deploymentOptions.getConfig();
    if (!config.isEmpty()) { // 將consul配置注入到context中
      local.mergeIn(config);
      deploymentOptions.setConfig(local);
    }
    super.beforeDeployingVerticle(deploymentOptions);
    logger.info("Loading config completed, config: {}", deploymentOptions.getConfig());
  }

  @Override
  public void afterConfigParsed(JsonObject config) {
    logger.info("Loading local config complete, local config: {}", config.encodePrettily());
  }

  @Override
  public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
    logger.error("Deploy verticle occur exception: {}, App will be closed immediately!", cause.getLocalizedMessage(), cause);
    vertx.close();
  }
}

其實寫完(wan)第二步(bu)就(jiu)可以(yi)知道(dao)這個配(pei)置(zhi)(zhi)文(wen)件存在(zai)不是必要(yao)(yao)的,我們使用了Consul作為配(pei)置(zhi)(zhi)中心(xin)來進行(xing)集中配(pei)置(zhi)(zhi),這一步(bu)主要(yao)(yao)是為了注入consul的配(pei)置(zhi)(zhi)以(yi)及加載日志。

3. 拉(la)取SQS中的數(shu)據

import com.amazonaws.AmazonServiceException;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.service.AwsSQSService;
import com.tinet.twatch.aladdin.config.Configurer;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class DataCollectVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(DataCollectVerticle.class);

  private volatile boolean shutdown = false;

  @Override
  public void start() throws Exception {
    logger.info("DataCollectVerticle starting...");
    AwsSQSService sqsService = Configurer.sqsService();
    EventBus bus = vertx.eventBus();
    vertx.setPeriodic(1000, id -> {
      try {
        if (shutdown) {
          vertx.cancelTimer(id);
        }
        JsonArray array = config().getJsonArray(Configurer.QUEUE_URL);
        List<YunMessage> msgs = sqsService.receiveMessageAndDelete(array.getString(0));
        List<YunMessage> userActionMsgs = sqsService.receiveMessageAndDelete(array.getString(1));
        bus.send(Configurer.CHANNEL_ADDRESS, Json.encode(msgs));
      } catch (AmazonServiceException e) {
        logger.warn("msgs received failed, cause: {}", e.getLocalizedMessage(), e);
      }
    });
  }



  @Override
  public void stop() throws Exception {
    shutdown = true;
    logger.info("DataCollectVerticle closing...");
  }
}

4. 將數(shu)據(ju)存儲到(dao)TDengine中

import com.github.benmanes.caffeine.cache.Cache;
import com.tinet.ctilink.yun.entity.YunMessage;
import com.tinet.twatch.aladdin.DataOperator;
import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.model.WebRTCLog;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class SaveVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(SaveVerticle.class);

  @Override
  public void start() throws Exception {
    logger.info("SaveVerticle starting....");

    // 從event bus接收數據
    EventBus bus = vertx.eventBus();
    bus.consumer(Configurer.CHANNEL_ADDRESS, (Handler<Message<String>>) msg -> {
      JsonArray coming = new JsonArray(msg.body());
      if (coming != null)
        save(coming);
    });
  }

  private void save(JsonArray array) {
    WebClient client = Configurer.tdClient();
    List<WebRTCLog> data = new ArrayList<>();
    Cache<String, WebRTCLog> cache = Configurer.cache();
    if (array.size() > 0) {
      final WebRTCLog empty = new WebRTCLog();
      for (int i = 0; i < array.size(); i++) {
        String message = array.getJsonObject(i).mapTo(YunMessage.class).getBody();
        try {
          JsonObject json = DataOperator.toJsonObject(message);
          WebRTCLog log = json.mapTo(WebRTCLog.class);
          String cacheKey = log.getDeviceId();
          WebRTCLog org = cache.get(cacheKey, k -> empty);
          if (!Objects.equals(org, empty)) { // 如果不是第一次插入
            DataOperator.merge(log, org);
          }
          cache.put(cacheKey, log);
          data.add(log);
        } catch (Exception e) {
          logger.error("log saved failed, cause: {}", e.getLocalizedMessage(), e);
        }
      }
      client.post("/rest/sql")
        .basicAuthentication(config().getString("tdengine.user"), config().getString("tdengine.password"))
        .sendBuffer(insert(data), ar -> {
            if (ar.succeeded()) {
                HttpResponse<Buffer> response = ar.result();
                if (response != null) {
                    JsonObject res = response.bodyAsJsonObject();
                    if (!"succ".equals(res.getString("status"))) {
                        logger.warn("data insert failed! data: {}, cause: {}", Json.encode(data), res.getString("desc"));
                    }
                }
            } else {
                logger.error("data insert failed! {}", Json.encode(data), ar.cause());
            }
        });
    }
  }

  private Buffer insert(WebRTCLog log) throws Exception {
    String formatter = "INSERT INTO ALADDIN.WEBRTC_LOG_%s " +
                       "  USING ALADDIN.WEBRTC_LOG TAGS(%s) " +
                       "VALUES(%s)";
    String sql = String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log));
    return Buffer.buffer(sql);
  }

  private Buffer insert(List<WebRTCLog> data) throws IllegalAccessException {
    StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ");
    String formatter = "ALADDIN.WEBRTC_LOG_%s USING ALADDIN.WEBRTC_LOG TAGS(%s) VALUES(%s) ";
    for (WebRTCLog log : data) {
      sqlBuilder.append(String.format(formatter, log.getDeviceId(), log.getDeviceId(), DataOperator.valToSql(log)));
    }
    return Buffer.buffer(sqlBuilder.toString());
  }

  @Override
  public void stop() throws Exception {
    logger.info("SaveVerticle closing....");
  }
}

5. 部署Verticle

import com.tinet.twatch.aladdin.config.Configurer;
import com.tinet.twatch.aladdin.verticle.DataCollectVerticle;
import com.tinet.twatch.aladdin.verticle.SaveVerticle;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.client.WebClientOptions;

public class MainVerticle extends AbstractVerticle {

  private Logger logger = LoggerFactory.getLogger(MainVerticle.class);

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    logger.info("MainVerticle starting...");
    // 初始化sqs
    String region = config().getString("aws.region");
    String accessKey = config().getString("aws.accessKey");
    String secretKey = config().getString("aws.secretAccessKey");
    Configurer.initSQSService(region, accessKey, secretKey, config().getJsonArray(Configurer.QUEUE_URL));

    DeploymentOptions dataCollectDeploymentOptions = new DeploymentOptions();
    dataCollectDeploymentOptions.setInstances(1);
    dataCollectDeploymentOptions.setConfig(config());
    dataCollectDeploymentOptions.setWorker(true);
    Configurer.initCache(config().getInteger("aladdin.cache.expireAfterWrite"), config().getInteger("aladdin.cache.expireAfterAccess"));
    vertx.deployVerticle(DataCollectVerticle.class.getName(), dataCollectDeploymentOptions, ar -> {
      if (ar.succeeded()) {
        logger.info("DataCollectVerticle started!");
      } else {
        logger.warn("DataCollectVerticle deploy failed! {}", ar.cause().getLocalizedMessage(), ar.cause());
      }
    });
    // 初始化webclient
    WebClientOptions options = new WebClientOptions();
    options.setMaxWaitQueueSize(config().getInteger("aladdin.maxWaitQueue"));
    options.setMaxPoolSize(config().getInteger("aladdin.maxPool"));
    options.setDefaultHost(config().getString("tdengine.host"));
    options.setDefaultPort(config().getInteger("tdengine.port"));
    Configurer.initTDClient(vertx, options);
    
    DeploymentOptions saveDeploymentOptions = new DeploymentOptions();
    saveDeploymentOptions.setInstances(1);
    saveDeploymentOptions.setConfig(config());
    vertx.deployVerticle(SaveVerticle.class.getName(), saveDeploymentOptions, ar -> {
      if (ar.succeeded()) {
        logger.info("SaveVerticle started!");
      } else {
        logger.warn("SaveVerticle deploy failed!");
      }
    });
  }
}

這樣就快速實(shi)現(xian)了一(yi)個(ge)日志上報的模塊,且(qie)多個(ge)實(shi)例部署時相(xiang)互之間不會產生影響,當然在實(shi)際的生產環境中(zhong),我們(men)需要(yao)考(kao)慮的會更多。

當然,日(ri)志上報只是開始。在(zai)(zai)之后的項目開發中,我還會繼(ji)續(xu)向大(da)家(jia)介紹TDengine Database在(zai)(zai)數據分析中的應用實踐,感謝觀看(kan)。

作者簡介:錢(qian)文錦(jin) ,天潤融通基礎(chu)研發(fa)部研發(fa)工程師(shi),開源社(she)區愛好者,目前(qian)主要負責天潤融通T-Phone SDK/CTI-Cloud相關(guan)功(gong)能開發(fa)和應用。

本文首發于: