1、使用 StringDebeziumDeserializationSchema

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1657163088, file=binlog.000020, pos=7231, row=1, server_id=1, event=2}} ConnectRecord{topic=‘mysql_binlog_source.flink.user’, kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.flink.user.Key:STRUCT}, value=Struct{before=Struct{id=3,username=test,password=test123},after=Struct{id=3,username=test,password=test},source=Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1657163088000,db=flink,table=user,server_id=1,file=binlog.000020,pos=7379,row=0},op=u,ts_ms=1657163088967}, valueSchema=Schema{mysql_binlog_source.flink.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

2、使用 JsonDebeziumDeserializationSchema

{“before”:null,“after”:{“id”:2,“username”:“root”,“password”:“root”},“source”:{“version”:“1.5.4.Final”,“connector”:“mysql”,“name”:“mysql_binlog_source”,“ts_ms”:0,“snapshot”:“false”,“db”:“flink”,“sequence”:null,“table”:“user”,“server_id”:0,“gtid”:null,“file”:“”,“pos”:0,“row”:0,“thread”:null,“query”:null},“op”:“r”,“ts_ms”:1657163169368,“transaction”:null}

以上两种都是原本就提供的,显然第二种更便于下游进行数据处理,那么自定义的会更好,可读性更强。

3、使用自定义反序列化器

package com.daidai.cdc;

import com.alibaba.fastjson.JSONObject;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import io.debezium.data.Envelope;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.util.Collector;

import org.apache.kafka.connect.data.Field;

import org.apache.kafka.connect.data.Struct;

import org.apache.kafka.connect.source.SourceRecord;

/**

* {

* "database":"",

* "table":""

* "operation":"",

* "data":""

* }

*/

public class CustomerSchema implements DebeziumDeserializationSchema {

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

//获取 database和 table

String topic = sourceRecord.topic();

//分隔符得写 \\. 不然就报错

String[] strings = topic.split("\\.");

String database = strings[1];

String table = strings[2];

//获取 data

Struct value = (Struct) sourceRecord.value();

JSONObject data = new JSONObject();

Struct before = value.getStruct("before");

JSONObject beforeData = new JSONObject();

for (Field field : before.schema().fields()) {

Object o = before.get(field);

beforeData.put(field.name(), o);

}

Struct after = value.getStruct("after");

JSONObject afterData = new JSONObject();

for (Field field : after.schema().fields()) {

Object o = after.get(field);

afterData.put(field.name(), o);

}

data.put("before", beforeData);

data.put("after", afterData);

//获取操作类型

Envelope.Operation op = Envelope.operationFor(sourceRecord);

//装配数据

JSONObject object = new JSONObject();

object.put("database", database);

object.put("table", table);

object.put("operation", op);

object.put("data", data);

collector.collect(object.toJSONString());

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: