UDF(用戶定義函數)
在有些應用場景中,應用邏輯需要的查詢無法直接使用系統內置的函數來表示。利用 UDF 功能,TDengine 可以插入用戶編寫的處理代碼并在查詢中使用它們,就能夠很方便地解決特殊應用場景中的使用需求。 UDF 通常以數據表中的一列數據做為輸入,同時支持以嵌套子查詢的結果作為輸入。
從 2.2.0.0 版本開始,TDengine 支持通過 C/C++ 語言進行 UDF 定義。接下來結合示例講解 UDF 的使用方法。
用 C/C++ 語言來定義 UDF
TDengine 提供 3 個 UDF 的源代碼示例,分別為:
標量函數
是結構最簡單的 UDF 實現。其功能為:對傳入的一個數據列(可能因 WHERE 子句進行了篩選)中的每一項,都輸出 +1 之后的值,并且要求輸入的列數據類型為 INT。
這一具體的處理邏輯在函數 void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf) 中定義。這類用于實現 UDF 的基礎計算邏輯的函數,我們稱為 udfNormalFunc,也就是對行數據塊的標量計算函數。需要注意的是,udfNormalFunc 的參數項是固定的,用于按照約束完成與引擎之間的數據交換。
- udfNormalFunc 中各參數的具體含義是:
- data:輸入數據。
- itype:輸入數據的類型。這里采用的是短整型表示法,與各種數據類型對應的值可以參見 column_meta 中的列類型說明。例如 4 用于表示 INT 型。
- iBytes:輸入數據中每個值會占用的字節數。
- numOfRows:輸入數據的總行數。
- ts:主鍵時間戳在輸入中的列數據(只讀)。
- dataOutput:輸出數據的緩沖區,緩沖區大小為用戶指定的輸出類型大小 * numOfRows。
- interBuf:中間計算結果的緩沖區,大小為用戶在創建 UDF 時指定的BUFSIZE大小。通常用于計算中間結果與最終結果不一致時使用,由引擎負責分配與釋放。
- tsOutput:主鍵時間戳在輸出時的列數據,如果非空可用于輸出結果對應的時間戳。
- numOfOutput:輸出結果的個數(行數)。
- oType:輸出數據的類型。取值含義與 itype 參數一致。
- oBytes:輸出數據中每個值占用的字節數。
- buf:用于在 UDF 與引擎間的狀態控制信息傳遞塊。
聚合函數
實現的是一個聚合函數,功能是對一組數據按絕對值取最大值。
其計算過程為:與所在查詢語句相關的數據會被分為多個行數據塊,對每個行數據塊調用 udfNormalFunc(在本例的實現代碼中,實際函數名是 abs_max)來生成每個子表的中間結果,再將子表的中間結果調用 udfMergeFunc(本例中,其實際的函數名是 abs_max_merge)進行聚合,生成超級表的最終聚合結果或中間結果。聚合查詢最后還會通過 udfFinalizeFunc(本例中,其實際的函數名是 abs_max_finalize)再把超級表的中間結果處理為最終結果,最終結果只能含0或1條結果數據。
值得注意的是,udfNormalFunc、udfMergeFunc、udfFinalizeFunc 之間,函數名約定使用相同的前綴,此前綴即 udfNormalFunc 的實際函數名。udfMergeFunc 的函數名后綴 _merge、udfFinalizeFunc 的函數名后綴 _finalize,是 UDF 實現規則的一部分,系統會按照這些函數名后綴來調用相應功能。
-
udfMergeFunc 用于對計算中間結果進行聚合,只有針對超級表的聚合查詢才需要調用該函數。本例中 udfMergeFunc 對應的實現函數為
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf),其中各參數的具體含義是:- data:udfNormalFunc 的輸出數據數組,如果使用了 interBuf 那么 data 就是 interBuf 的數組。
- numOfRows:data 中數據的行數。
- dataOutput:輸出數據的緩沖區,大小等于一條最終結果的大小。如果此時輸出還不是最終結果,可以選擇輸出到 interBuf 中即data中。
- numOfOutput:輸出結果的個數(行數)。
- buf:用于在 UDF 與引擎間的狀態控制信息傳遞塊。
-
udfFinalizeFunc 用于對計算結果進行最終計算,通常用于有 interBuf 使用的場景。本例中 udfFinalizeFunc 對應的實現函數為
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf),其中各參數的具體含義是:- dataOutput:輸出數據的緩沖區。
- interBuf:中間結算結果緩沖區,可作為輸入。
- numOfOutput:輸出數據的個數,對聚合函數來說只能是0或者1。
- buf:用于在 UDF 與引擎間的狀態控制信息傳遞塊。
其他典型場景,如協方差的計算,即可通過定義聚合UDF的方式實現。
其他 UDF 函數
用戶 UDF 程序除了需要實現上面幾個函數外,還有兩個用于初始化和釋放 UDF 與引擎間的狀態控制信息傳遞塊的函數。具體來說,也即對應 udfInitFunc 和 udfDestroyFunc。其函數名命名規則同樣是采取以 udfNormalFunc 的實際函數名為前綴,以 _init 和 _destroy 為后綴。系統會在初始化和資源釋放時調用對應名稱的函數。
-
udfInitFunc 用于初始化狀態控制信息傳遞塊。上例中 udfInitFunc 對應的實現函數為
int abs_max_init(SUdfInit* buf),其中各參數的具體含義是:- buf:用于在 UDF 與引擎間的狀態控制信息傳遞塊。
-
udfDestroyFunc 用于釋放狀態控制信息傳遞塊。上例中 udfDestroyFunc 對應的實現函數為
void abs_max_destroy(SUdfInit* buf),其中各參數的具體含義是:- buf:用于在 UDF 與引擎間的狀態控制信息傳遞塊。
目前該功能暫時沒有實際意義,待后續擴展使用。
UDF 實現方式的規則總結
根據 UDF 函數類型的不同,用戶所要實現的功能函數也不同:
- 標量函數:UDF 中需實現 udfNormalFunc。
- 聚合函數:UDF 中需實現 udfNormalFunc、udfMergeFunc(對超級表查詢)、udfFinalizeFunc。
需要注意的是,如果對應的函數不需要具體的功能,也需要實現一個空函數。
編譯 UDF
用戶定義函數的 C 語言源代碼無法直接被 TDengine 系統使用,而是需要先編譯為 .so 鏈接庫,之后才能載入 TDengine 系統。
例如,按照上一章節描述的規則準備好了用戶定義函數的源代碼 add_one.c,那么可以執行如下指令編譯得到動態鏈接庫文件:
gcc -g -O0 -fPIC -shared add_one.c -o add_one.so
這樣就準備好了動態鏈接庫 add_one.so 文件,可以供后文創建 UDF 時使用了。為了保證可靠的系統運行,編譯器 GCC 推薦使用 7.5及以上版本。
在系統中管理和使用 UDF
創建 UDF
用戶可以通過 SQL 指令在系統中加載客戶端所在主機上的 UDF 函數庫(不能通過 RESTful 接口或 HTTP 管理界面來進行這一過程)。一旦創建成功,則當前 TDengine 集群的所有用戶都可以在 SQL 指令中使用這些函數。UDF 存儲在系統的 MNode 節點上,因此即使重啟 TDengine 系統,已經創建的 UDF 也仍然可用。
在創建 UDF 時,需要區分標量函數和聚合函數。如果創建時聲明了錯誤的函數類別,則可能導致通過 SQL 指令調用函數時出錯。此外, UDF 支持輸入與輸出類型不一致,用戶需要保證輸入數據類型與 UDF 程序匹配,UDF 輸出數據類型與 OUTPUTTYPE 匹配。
-
創建標量函數:
CREATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];- ids(X):標量函數未來在 SQL 指令中被調用時的函數名,必須與函數實現中 udfNormalFunc 的實際名稱一致;
- ids(Y):包含 UDF 函數實現的動態鏈接庫的庫文件絕對路徑(指的是庫文件在當前客戶端所在主機上的保存路徑,通常是指向一個 .so 文件),這個路徑需要用英文單引號或英文雙引號括起來;
- typename(Z):此函數計算結果的數據類型,與上文中 udfNormalFunc 的 itype 參數不同,這里不是使用數字表示法,而是直接寫類型名稱即可;
- B:中間計算結果的緩沖區大小,單位是字節,最小 0,最大 512,如果不使用可以不設置。
例如,如下語句可以把 add_one.so 創建為系統中可用的 UDF:
CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT; -
創建聚合函數:
CREATE AGGREGATE FUNCTION ids(X) AS ids(Y) OUTPUTTYPE typename(Z) [ BUFSIZE B ];- ids(X):聚合函數未來在 SQL 指令中被調用時的函數名,必須與函數實現中 udfNormalFunc 的實際名稱一致;
- ids(Y):包含 UDF 函數實現的動態鏈接庫的庫文件絕對路徑(指的是庫文件在當前客戶端所在主機上的保存路徑,通常是指向一個 .so 文件),這個路徑需要用英文單引號或英文雙引號括起來;
- typename(Z):此函數計算結果的數據類型,與上文中 udfNormalFunc 的 itype 參數不同,這里不是使用數字表示法,而是直接寫類型名稱即可;
- B:中間計算結果的緩沖區大小,單位是字節,最小 0,最大 512,如果不使用可以不設置。
關于中間計算結果的使用,可以參考示例程序
例如,如下語句可以把 demo.so 創建為系統中可用的 UDF:
CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14;
管理 UDF
- 刪除指定名稱的用戶定義函數:
DROP FUNCTION ids(X);- ids(X):此參數的含義與 CREATE 指令中的 ids(X) 參數一致,也即要刪除的函數的名字,例如
DROP FUNCTION add_one;。
- ids(X):此參數的含義與 CREATE 指令中的 ids(X) 參數一致,也即要刪除的函數的名字,例如
- 顯示系統中當前可用的所有 UDF:
SHOW FUNCTIONS;
調用 UDF
在 SQL 指令中,可以直接以在系統中創建 UDF 時賦予的函數名來調用用戶定義函數。例如:
SELECT X(c) FROM table/stable;
表示對名為 c 的數據列調用名為 X 的用戶定義函數。SQL 指令中用戶定義函數可以配合 WHERE 等查詢特性來使用。
UDF 的一些使用限制
在當前版本下,使用 UDF 存在如下這些限制:
- 在創建和調用 UDF 時,服務端和客戶端都只支持 Linux 操作系統;
- UDF 不能與系統內建的 SQL 函數混合使用,暫不支持在一條 SQL 語句中使用多個不同名的 UDF ;
- UDF 只支持以單個數據列作為輸入;
- UDF 只要創建成功,就會被持久化存儲到 MNode 節點中;
- 無法通過 RESTful 接口來創建 UDF;
- UDF 在 SQL 中定義的函數名,必須與 .so 庫文件實現中的接口函數名前綴保持一致,也即必須是 udfNormalFunc 的名稱,而且不可與 TDengine 中已有的內建 SQL 函數重名。
代碼附件
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
// printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
// printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1;
// printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
// printf("add_one out, numOfOutput:%d\n", *numOfOutput);
}
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
int64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
void abs_max(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int64_t r = 0;
// printf("abs_max input data:%p, type:%d, rows:%d, ts:%p, %" PRId64 ", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
r=*(int64_t *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((int64_t *)data + i) == TSDB_DATA_BIGINT_NULL) {
continue;
}
*numOfOutput=1;
//int64_t v = abs(*((int64_t *)data + i));
int64_t v = *((int64_t *)data + i);
if (v < 0) {
v = 0 - v;
}
if (v > r) {
r = v;
}
}
*(int64_t *)dataOutput=r;
// printf("abs_max out, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}else {
*numOfOutput=0;
}
}
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
//int64_t r = 0;
// printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
// *numOfOutput=1;
// printf("abs_max finalize, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int64_t r = 0;
if (numOfRows > 0) {
r = *((int64_t *)data);
}
// printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 1; i < numOfRows; ++i) {
// printf("abs_max_merge %d - %" PRId64"\n", i, *((int64_t *)data + i));
if (*((int64_t*)data + i) > r) {
r= *((int64_t*)data + i);
}
}
*(int64_t*)dataOutput=r;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
// printf("abs_max_merge, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
int abs_max_init(SUdfInit* buf) {
// printf("abs_max init\n");
return 0;
}
void abs_max_destroy(SUdfInit* buf) {
// printf("abs_max destroy\n");
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
typedef struct SDemo{
double sum;
int num;
short otype;
}SDemo;
#define FLOAT_NULL 0x7FF00000 // it is an NAN
#define DOUBLE_NULL 0x7FFFFF0000000000L // it is an NAN
void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
double r = 0;
SDemo *p = (SDemo *)interBuf;
SDemo *q = (SDemo *)dataOutput;
printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
if (itype == 4) {
r=*((int *)data+i);
} else if (itype == 6) {
r=*((float *)data+i);
} else if (itype == 7) {
r=*((double *)data+i);
}
p->sum += r*r;
}
p->otype = otype;
p->num += numOfRows;
q->sum = p->sum;
q->num = p->num;
q->otype = p->otype;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int i;
SDemo *p = (SDemo *)data;
SDemo res = {0};
printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf);
for(i=0;i<numOfRows;++i) {
res.sum += p->sum * p->sum;
res.num += p->num;
p++;
}
p->sum = res.sum;
p->num = res.num;
*numOfOutput=1;
printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput);
}
void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
SDemo *p = (SDemo *)interBuf;
printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num);
if (p->otype == 6) {
if (p->num != 30000) {
*(unsigned int *)dataOutput = FLOAT_NULL;
} else {
*(float *)dataOutput = (float)(p->sum / p->num);
}
printf("finalize values:%f\n", *(float *)dataOutput);
} else if (p->otype == 7) {
if (p->num != 30000) {
*(unsigned long long *)dataOutput = DOUBLE_NULL;
} else {
*(double *)dataOutput = (double)(p->sum / p->num);
}
printf("finalize values:%f\n", *(double *)dataOutput);
}
*numOfOutput=1;
printf("demo finalize, numOfOutput:%d\n", *numOfOutput);
}
int demo_init(SUdfInit* buf) {
printf("demo init\n");
return 0;
}
void demo_destroy(SUdfInit* buf) {
printf("demo destroy\n");
}

