1、udtf
即一反多的函数,使用场景较为广泛,近期也正好用到了,就写一下吧
2、程序场景:
将含有n个json的jsonarray转换成n条数据,且每个字段都解析出来
json:
[{"tenant_id":"","instance_number":"","trace_id":"76701623cd023","cost":360,"var":{"click_type":"itemClick","action_key":"refrsh","control_id":"toolbarap","pre_page":"imro63ad756b6448a818cf162b2d51671","page":"1462287285318893568root4963abd756b6448a818cf162b2d51671","control_name":"刷新","control_type":"toolbar"},"form_id":"bos_list","bill_form_id":"yxt_rm_assetofbill","session_id":"Trq27Jw0XnWC9t","uid":"1442810987391484933","event_id":"7e040ba625316332daf66d6049a07a16","dc_id":"1430558998880323584","event_name":"click","ctime":1666677450519,"time":166667040615,"app_id":"im","tenant_code":"yxtprd"},{"tenant_id":"","instance_number":"","trace_id":"763b706213cd023","cost":360,"var":{"click_type":"itemClick","action_key":"refresh","control_id":"toolbarap","pre_page":"imroot4963abd756b8a818cf162b2d51671","page":"1462287285318893568rod756b6448a818cf162b2d51671","control_name":"刷新","control_type":"toolar"},"form_id":"bos_list","bill_form_id":"yxt_rm_assetofbill","session_id":"Trq27Jw370XnWC9t","uid":"14428109873914933","event_id":"7e040ba62531632daf66d6049a07a16","dc_id":"14305588880323584","event_name":"click","ctime":166667750519,"time":16666708615,"app_id":"im","tenant_code":"ytprd"}]
依赖:
UDTF:
package udtf
import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api.Types
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
import org.apache.flink.util.StringUtils
//自定义udtf函数
@FunctionHint(output = new DataTypeHint("row
class JsonArrayParseUDTF extends TableFunction[Row] {
//解析JsonAraay
def eval(str: String) = {
if (!StringUtils.isNullOrWhitespaceOnly(str)) {
try {
val jSONArray = JSON.parseArray(str)
val value: Array[AnyRef] = jSONArray.toArray()
for (i <- Range(0, value.length)) {
var uid, dc_id, tenant_id, tenant_code, app_id, form_id, bill_form_id, event_id, event_name,
instance_number, trace_id, session_id = ""
val jsonObject = JSON.parseObject(value(i).toString)
uid = jsonObject.getString("uid")
dc_id = jsonObject.getString("dc_id")
tenant_id = jsonObject.getString("tenant_id")
tenant_code = jsonObject.getString("tenant_code")
app_id = jsonObject.getString("app_id")
form_id = jsonObject.getString("form_id")
bill_form_id = jsonObject.getString("bill_form_id")
val time = jsonObject.getLong("time")
event_id = jsonObject.getString("event_id")
event_name = jsonObject.getString("event_name")
instance_number = jsonObject.getString("instance_number")
val cost = jsonObject.getLong("cost")
val ctime = jsonObject.getLong("ctime")
trace_id = jsonObject.getString("trace_id")
session_id = jsonObject.getString("session_id")
var page, pre_page, control_id, control_name, control_type, click_type, action_key = ""
if (jsonObject.containsKey("var")) {
val varObject = jsonObject.getJSONObject("var")
page = varObject.getString("page")
pre_page = varObject.getString("pre_page")
control_id = varObject.getString("control_id")
control_name = varObject.getString("control_name")
control_type = varObject.getString("control_type")
click_type = varObject.getString("click_type")
action_key = varObject.getString("action_key")
}
collect(Row.of(uid, dc_id, tenant_id, tenant_code, app_id, form_id, bill_form_id, time, event_id, event_name, instance_number, cost, ctime, trace_id, session_id, page, pre_page, control_id, control_name, control_type, click_type, action_key))
}
} catch {
case e: Exception => println(e)
}
}
}
//定义返回值类型
//@Override
override def getResultType(): TypeInformation[Row] = {
Types.ROW(Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.LONG,
Types.STRING,
Types.STRING,
Types.STRING,
Types.LONG,
Types.LONG,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING
)
}
}
打包测试: jar包需要放在flink安装目录的lib里
注册函数
create temporary function myudtf as 'udtf.JsonArrayParseUDTF';
读取数据
CREATE TABLE log_table (
log string
) WITH (
'connector' = 'filesystem',
'path' = '/opt/software/flink-1.13.1/words',
'format' = 'raw'
);
select
uid,dc_id,tenant_id,tenant_code,app_id,form_id,bill_form_id,`time`,event_id,event_name,instance_number,cost,ctime,trace_id,session_id,page,pre_page,control_id,control_name,control_type,click_type,action_key
from log_table left join
lateral table (myudtf(log) ) as t(uid,dc_id,tenant_id,tenant_code,app_id,form_id,bill_form_id,`time`,event_id,event_name,instance_number,cost,ctime,trace_id,session_id,page,pre_page,control_id,control_name,control_type,click_type,action_key)
on true;
结果: 齐活
精彩内容
发表评论