1、udtf

即一反多的函数,使用场景较为广泛,近期也正好用到了,就写一下吧

2、程序场景:

将含有n个json的jsonarray转换成n条数据,且每个字段都解析出来

json:

[{"tenant_id":"","instance_number":"","trace_id":"76701623cd023","cost":360,"var":{"click_type":"itemClick","action_key":"refrsh","control_id":"toolbarap","pre_page":"imro63ad756b6448a818cf162b2d51671","page":"1462287285318893568root4963abd756b6448a818cf162b2d51671","control_name":"刷新","control_type":"toolbar"},"form_id":"bos_list","bill_form_id":"yxt_rm_assetofbill","session_id":"Trq27Jw0XnWC9t","uid":"1442810987391484933","event_id":"7e040ba625316332daf66d6049a07a16","dc_id":"1430558998880323584","event_name":"click","ctime":1666677450519,"time":166667040615,"app_id":"im","tenant_code":"yxtprd"},{"tenant_id":"","instance_number":"","trace_id":"763b706213cd023","cost":360,"var":{"click_type":"itemClick","action_key":"refresh","control_id":"toolbarap","pre_page":"imroot4963abd756b8a818cf162b2d51671","page":"1462287285318893568rod756b6448a818cf162b2d51671","control_name":"刷新","control_type":"toolar"},"form_id":"bos_list","bill_form_id":"yxt_rm_assetofbill","session_id":"Trq27Jw370XnWC9t","uid":"14428109873914933","event_id":"7e040ba62531632daf66d6049a07a16","dc_id":"14305588880323584","event_name":"click","ctime":166667750519,"time":16666708615,"app_id":"im","tenant_code":"ytprd"}]

依赖:

org.apache.flink

flink-table_2.12

1.13.0

provided

org.apache.flink

flink-streaming-scala_2.12

1.13.0

provided

com.alibaba

fastjson

1.2.47

UDTF:

package udtf

import com.alibaba.fastjson.JSON

import org.apache.flink.api.common.typeinfo.TypeInformation

import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}

import org.apache.flink.table.api.Types

import org.apache.flink.table.functions.TableFunction

import org.apache.flink.types.Row

import org.apache.flink.util.StringUtils

//自定义udtf函数

@FunctionHint(output = new DataTypeHint("row"))

class JsonArrayParseUDTF extends TableFunction[Row] {

//解析JsonAraay

def eval(str: String) = {

if (!StringUtils.isNullOrWhitespaceOnly(str)) {

try {

val jSONArray = JSON.parseArray(str)

val value: Array[AnyRef] = jSONArray.toArray()

for (i <- Range(0, value.length)) {

var uid, dc_id, tenant_id, tenant_code, app_id, form_id, bill_form_id, event_id, event_name,

instance_number, trace_id, session_id = ""

val jsonObject = JSON.parseObject(value(i).toString)

uid = jsonObject.getString("uid")

dc_id = jsonObject.getString("dc_id")

tenant_id = jsonObject.getString("tenant_id")

tenant_code = jsonObject.getString("tenant_code")

app_id = jsonObject.getString("app_id")

form_id = jsonObject.getString("form_id")

bill_form_id = jsonObject.getString("bill_form_id")

val time = jsonObject.getLong("time")

event_id = jsonObject.getString("event_id")

event_name = jsonObject.getString("event_name")

instance_number = jsonObject.getString("instance_number")

val cost = jsonObject.getLong("cost")

val ctime = jsonObject.getLong("ctime")

trace_id = jsonObject.getString("trace_id")

session_id = jsonObject.getString("session_id")

var page, pre_page, control_id, control_name, control_type, click_type, action_key = ""

if (jsonObject.containsKey("var")) {

val varObject = jsonObject.getJSONObject("var")

page = varObject.getString("page")

pre_page = varObject.getString("pre_page")

control_id = varObject.getString("control_id")

control_name = varObject.getString("control_name")

control_type = varObject.getString("control_type")

click_type = varObject.getString("click_type")

action_key = varObject.getString("action_key")

}

collect(Row.of(uid, dc_id, tenant_id, tenant_code, app_id, form_id, bill_form_id, time, event_id, event_name, instance_number, cost, ctime, trace_id, session_id, page, pre_page, control_id, control_name, control_type, click_type, action_key))

}

} catch {

case e: Exception => println(e)

}

}

}

//定义返回值类型

//@Override

override def getResultType(): TypeInformation[Row] = {

Types.ROW(Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.LONG,

Types.STRING,

Types.STRING,

Types.STRING,

Types.LONG,

Types.LONG,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING,

Types.STRING

)

}

}

打包测试: jar包需要放在flink安装目录的lib里

注册函数

create temporary function myudtf as 'udtf.JsonArrayParseUDTF';

读取数据

CREATE TABLE log_table (

log string

) WITH (

'connector' = 'filesystem',

'path' = '/opt/software/flink-1.13.1/words',

'format' = 'raw'

);

select

uid,dc_id,tenant_id,tenant_code,app_id,form_id,bill_form_id,`time`,event_id,event_name,instance_number,cost,ctime,trace_id,session_id,page,pre_page,control_id,control_name,control_type,click_type,action_key

from log_table left join

lateral table (myudtf(log) ) as t(uid,dc_id,tenant_id,tenant_code,app_id,form_id,bill_form_id,`time`,event_id,event_name,instance_number,cost,ctime,trace_id,session_id,page,pre_page,control_id,control_name,control_type,click_type,action_key)

on true;

结果: 齐活

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: