相关资料教程地址

flink-cdc 资料

https://ververica.github.io/flink-cdc-connectors/release-2.3/

flink connectors教程

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/

doris 教程

https://doris.apache.org/zh-CN/docs/dev/get-starting/quick-start

mongo

https://www.mongodb.com/docs/manual/changeStreams/

1、Flink环境搭建(环境搭建自己去搭建下)

使用的Flink版本是1.17.1

2、Doris环境搭建(环境搭建自己去搭建下)

使用mysql工具链接

3、同步实现原理

监听Mongo的Change Stream,将数据的变化实时同步到Doris

4、同步实现

4.1 同步脚本实现方案(需要前置准备jar包,添加jar包还得重启flink服务,执行脚本还得上服务运行,不方便,而且对添加自定义字段等自定义场景不友好)

可参考:https://ververica.github.io/flink-cdc-connectors/release-2.3/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/build-real-time-data-lake-tutorial-zh.html

4.1.1 添加同步jar包到flink/lib目录下并启动flink(添加jar需重启flink服务,前置准备所有的connector包到flink/lib下)

https://repo1.maven.org/maven2/com/ververica/ https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.17/1.4.0/flink-doris-connector-1.17-1.4.0.jar

4.1.2 编写mongoToDorisSql.sql

-- 设置间隔时间

SET 'execution.checkpointing.interval' = '5s';

-- 设置本地时区为 Asia/Shanghai

SET 'table.local-time-zone' = 'Asia/Shanghai';

-- 切换到基于磁盘的状态后端

SET 'state.backend' = 'filesystem';

SET 'state.checkpoints.dir' = 'file:///flink/checkpoints';

-- 增加内存状态后端容量大小

SET 'state.backend.memory.maxStateSize' = '1g';

-- 配置检查点清理策略

SET 'state.checkpoints.num-retained' = '2';

-- 减小作业数据并行度

SET 'parallelism.default' = '4';

-- 开启增量检查点合并小状态

SET 'state.backend.incremental' = 'true';

-- 开启异步检查点

SET 'checkpointing.mode' = 'at_least_once';

-- 切换到 RocksDBStateBackend

SET 'state.backend' = 'rocksdb';

CREATE TABLE waybill_515_form (

_id STRING,

userId INT,

waybillCode STRING,

PRIMARY KEY (_id) NOT ENFORCED

) WITH (

'connector' = 'mongodb-cdc',

'hosts' = '10.10.67.209:27017',

'username' = 'root',

'password' = '123456',

'database' = 'waybillcenter_saas',

'collection' = 'waybill_515'

);

CREATE TABLE waybill_515_to (

_id STRING,

userId INT,

waybillCode STRING,

PRIMARY KEY (_id) NOT ENFORCED

) WITH (

'connector' = 'doris',

'fenodes' = '192.168.1.57:8030',

'table.identifier' = 'test.waybill_515',

'username' = 'root',

'password' = '',

'sink.enable-delete' = 'true', -- 同步删除事件

'sink.label-prefix' = 'waybill_515'

);

INSERT INTO waybill_515_to SELECT * FROM waybill_515_form f;

4.1.3、 执行脚本

./flink/bin/sql-client.sh -f mongoToDorisSql.sql

4.2 基于sql的java代码实现方案(相比于4.1维护更加方便,对自定义字段等自定义场景很友好)

可参考:https://github.com/apache/doris/blob/master/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Cdc2DorisSQLDemo.java

4.2.1 添加同步jar包到flink/lib目录下并启动flink(参考4.1.1)

4.2.2 WaybillSyncSQLJobTest.java

public class WaybillSyncSQLJobTest {

public static void main(String[] args) {

if (args.length == 0) {

throw new RuntimeException("tid不能为空,以,分割,Program Arguments中设置,例如515,516");

}

String[] tids = args[0].split(",");

for (String tid : tids) {

StreamExecutionEnvironment env = CreateEnvUtil.getStreamExecutionEnvironment();

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql("CREATE TABLE waybill_" + tid + "_form (\n" +

"_id STRING,\n" +

"importFrom INT,\n" +

"batchId String,\n" +

"userId INT,\n" +

"PRIMARY KEY (_id) NOT ENFORCED\n" +

" ) WITH (\n" +

"'connector' = 'mongodb-cdc',\n" +

"'hosts' = '10.10.67.209:27017',\n" +

"'username' = 'root',\n" +

"'password' = '123456',\n" +

"'database' = 'waybillcenter_saas',\n" +

"'collection' = 'waybill_" + tid + "'\n" +

" );");

tEnv.executeSql("CREATE TABLE waybill_" + tid + "_to (\n" +

"_id STRING,\n" +

"importFrom INT,\n" +

"batchId String,\n" +

"userId INT,\n" +

"PRIMARY KEY (_id) NOT ENFORCED\n" +

" ) WITH (\n" +

"\t'connector' = 'doris',\n" +

"\t'fenodes' = '192.168.1.57:8030',\n" +

"\t'table.identifier' = 'test.waybill_" + tid + "',\n" +

"\t'username' = 'root',\n" +

"\t'password' = '',\n" +

"\t'sink.enable-delete' = 'true', \n" +

"\t'sink.label-prefix' = 'waybill_" + tid + "_" + UUID.randomUUID().toString() + "'\n" +

" );");

tEnv.executeSql("INSERT INTO waybill_" + tid + "_to SELECT * FROM waybill_" + tid + "_form f;");

}

}

}

class CreateEnvUtil {

/**

* 获取执行环境

*

* @return

*/

public static StreamExecutionEnvironment getStreamExecutionEnvironment() {

//使用TimeZone类设置作业运行时的默认时区:

TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//TODO 2.检查点相关的设置

//开启检查点

env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);

//设置检查点的超时时间

env.getCheckpointConfig().setCheckpointTimeout(120000L);

//设置job取消之后 检查点是否保留

env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

//设置两个检查点之间的最小时间间隔

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);

//设置重启策略

env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));

//设置状态后端

env.setStateBackend(new FsStateBackend("file:///flink/checkpoints"));

return env;

}

}

4.2.3 提交任务(上传jar并提交任务)

4.3 基于Stream实现(mongo数据结构层级乱,坑有点多,建议使用4.2,如果其他源数据库与目标数据库JSON可衔接同步数据,选择这种方式会更简单,无需处理字段映射)

可参考:https://github.com/apache/doris/blob/master/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java

4.3.1 添加同步jar包到flink/lib目录下并启动flink(参考4.1.1)

4.3.2 可自定义Json序列化支持mongo,需处理的场景较多,需定制化处理,如下

MongoJsonDebeziumSchemaSerializer.java 核心定制化处理的方法:extractRow(JsonNode recordRow)

public class MongoJsonDebeziumSchemaSerializer implements DorisRecordSerializer {

private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);

private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";

private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";

private static final String OP_READ = "r"; // snapshot read

private static final String OP_CREATE = "insert"; // insert

private static final String OP_UPDATE = "update"; // update

private static final String OP_DELETE = "delete"; // delete

public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int

private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";

private final Pattern addDropDDLPattern;

private DorisOptions dorisOptions;

private ObjectMapper objectMapper = new ObjectMapper();

private String database;

private String table;

//table name of the cdc upstream, format is db.tbl

private String sourceTableName;

public MongoJsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {

this.dorisOptions = dorisOptions;

this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;

String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");

this.database = tableInfo[0];

this.table = tableInfo[1];

this.sourceTableName = sourceTableName;

// Prevent loss of decimal data precision

this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);

JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);

this.objectMapper.setNodeFactory(jsonNodeFactory);

}

@Override

public byte[] serialize(String record) throws IOException {

LOG.debug("received debezium json data {} :", record);

JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);

String op = extractJsonNode(recordRoot, "operationType");

if (Objects.isNull(op)) {

//schema change ddl

schemaChange(recordRoot);

return null;

}

Map valueMap;

switch (op) {

case OP_READ:

case OP_CREATE:

case OP_UPDATE:

valueMap = extractAfterRow(recordRoot);

addDeleteSign(valueMap, false);

break;

case OP_DELETE:

valueMap = extractBeforeRow(recordRoot);

addDeleteSign(valueMap, true);

break;

default:

LOG.error("parse record fail, unknown op {} in {}", op, record);

return null;

}

return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);

}

@VisibleForTesting

public boolean schemaChange(JsonNode recordRoot) {

boolean status = false;

try {

if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {

return false;

}

String ddl = extractDDL(recordRoot);

if (StringUtils.isNullOrWhitespaceOnly(ddl)) {

LOG.info("ddl can not do schema change:{}", recordRoot);

return false;

}

boolean doSchemaChange = checkSchemaChange(ddl);

status = doSchemaChange && execSchemaChange(ddl);

LOG.info("schema change status:{}", status);

} catch (Exception ex) {

LOG.warn("schema change error :", ex);

}

return status;

}

/**

* When cdc synchronizes multiple tables, it will capture multiple table schema changes

*/

protected boolean checkTable(JsonNode recordRoot) {

String db = extractDatabase(recordRoot);

String tbl = extractTable(recordRoot);

String dbTbl = db + "." + tbl;

return sourceTableName.equals(dbTbl);

}

private void addDeleteSign(Map valueMap, boolean delete) {

if (delete) {

valueMap.put(DORIS_DELETE_SIGN, "1");

} else {

valueMap.put(DORIS_DELETE_SIGN, "0");

}

}

private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {

String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);

Map param = buildRequestParam(ddl);

if (param.size() != 2) {

return false;

}

HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);

httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());

httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));

boolean success = handleResponse(httpGet);

if (!success) {

LOG.warn("schema change can not do table {}.{}", database, table);

}

return success;

}

/**

* Build param

* {

* "isDropColumn": true,

* "columnName" : "column"

* }

*/

protected Map buildRequestParam(String ddl) {

Map params = new HashMap<>();

Matcher matcher = addDropDDLPattern.matcher(ddl);

if (matcher.find()) {

String op = matcher.group(1);

String col = matcher.group(3);

params.put("isDropColumn", op.equalsIgnoreCase("DROP"));

params.put("columnName", col);

}

return params;

}

private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {

Map param = new HashMap<>();

param.put("stmt", ddl);

String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);

HttpPost httpPost = new HttpPost(requestUrl);

httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());

httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");

httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));

boolean success = handleResponse(httpPost);

return success;

}

protected String extractDatabase(JsonNode record) {

if (record.get("source").has("schema")) {

//compatible with schema

return extractJsonNode(record.get("source"), "schema");

} else {

return extractJsonNode(record.get("source"), "db");

}

}

protected String extractTable(JsonNode record) {

return extractJsonNode(record.get("source"), "table");

}

private boolean handleResponse(HttpUriRequest request) {

try (CloseableHttpClient httpclient = HttpClients.createDefault()) {

CloseableHttpResponse response = httpclient.execute(request);

final int statusCode = response.getStatusLine().getStatusCode();

if (statusCode == 200 && response.getEntity() != null) {

String loadResult = EntityUtils.toString(response.getEntity());

Map responseMap = objectMapper.readValue(loadResult, Map.class);

String code = responseMap.getOrDefault("code", "-1").toString();

if (code.equals("0")) {

return true;

} else {

LOG.error("schema change response:{}", loadResult);

}

}

} catch (Exception e) {

LOG.error("http request error,", e);

}

return false;

}

private String extractJsonNode(JsonNode record, String key) {

return record != null && record.get(key) != null ? record.get(key).asText() : null;

}

private Map extractBeforeRow(JsonNode record) throws IOException {

return extractRow(record.get("documentKey"));

}

private Map extractAfterRow(JsonNode record) throws IOException {

return extractRow(record.get("fullDocument"));

}

private Map extractRow(JsonNode recordRow) throws IOException {

String content = recordRow.textValue();

JsonNode jsonNode = objectMapper.readValue(content, JsonNode.class);

Iterator fieldNames = jsonNode.fieldNames();

while (fieldNames.hasNext()) {

String fieldName = fieldNames.next();

JsonNode jsonNodeChild = jsonNode.get(fieldName);

JsonNode date = jsonNodeChild.get("$date");

if (date != null) {

((ObjectNode) jsonNode).set(fieldName, date);

continue;

}

JsonNode oid = jsonNodeChild.get("$oid");

if (oid != null) {

((ObjectNode) jsonNode).set(fieldName, oid);

}

}

Map recordMap = null;

try {

recordMap = objectMapper.convertValue(jsonNode, new TypeReference>() {

});

} catch (Exception e) {

throw new IOException("【MongoJsonDebeziumSchemaSerializer异常" + e.getMessage() + "】" + jsonNode.toString());

}

return recordMap != null ? recordMap : new HashMap<>();

}

public String extractDDL(JsonNode record) throws JsonProcessingException {

String historyRecord = extractJsonNode(record, "historyRecord");

if (Objects.isNull(historyRecord)) {

return null;

}

String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");

LOG.debug("received debezium ddl :{}", ddl);

if (!Objects.isNull(ddl)) {

//filter add/drop operation

Matcher matcher = addDropDDLPattern.matcher(ddl);

if (matcher.find()) {

String op = matcher.group(1);

String col = matcher.group(3);

String type = matcher.group(5);

type = handleType(type);

ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);

LOG.info("parse ddl:{}", ddl);

return ddl;

}

}

return null;

}

private String authHeader() {

return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));

}

public static MongoJsonDebeziumSchemaSerializer.Builder builder() {

return new MongoJsonDebeziumSchemaSerializer.Builder();

}

/**

* Builder for JsonDebeziumSchemaSerializer.

*/

public static class Builder {

private DorisOptions dorisOptions;

private Pattern addDropDDLPattern;

private String sourceTableName;

public MongoJsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {

this.dorisOptions = dorisOptions;

return this;

}

public MongoJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {

this.addDropDDLPattern = addDropDDLPattern;

return this;

}

public MongoJsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {

this.sourceTableName = sourceTableName;

return this;

}

public MongoJsonDebeziumSchemaSerializer build() {

return new MongoJsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);

}

}

private String handleType(String type) {

if (type == null || "".equals(type)) {

return "";

}

// varchar len * 3

Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);

Matcher matcher = pattern.matcher(type);

if (matcher.find()) {

String len = matcher.group(1);

return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));

}

return type;

}

}

4.3.3 使用方式

StreamExecutionEnvironment env = CreateEnvUtil.getStreamExecutionEnvironment();

MongoDBSource mongoSource = MongoDBSource.builder()

.hosts("10.10.67.209:27017")

.username("root")

.password("123456")

.databaseList("waybillcenter_saas")

.collectionList("waybillcenter_saas.waybill_" + tid)

.startupOptions(StartupOptions.initial())

.deserializer(new JsonDebeziumDeserializationSchema())

.build();

Properties props = new Properties();

props.setProperty("format", "json");

props.setProperty("read_json_by_line", "true");

props.setProperty(LoadConstants.COLUMNS_KEY, "_id");

DorisOptions dorisOptions = DorisOptions.builder()

.setFenodes("192.168.1.57:8030")

.setTableIdentifier("test.waybill_" + tid)

.setUsername("root")

.setPassword("").build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

executionBuilder.setLabelPrefix("waybill" + tid + "_" + UUID.randomUUID().toString())

.setStreamLoadProp(props).setDeletable(true);

DorisSink.Builder builder = DorisSink.builder();

DorisSink sink = builder.setDorisReadOptions(DorisReadOptions.builder().build())

.setDorisExecutionOptions(executionBuilder.build())

.setDorisOptions(dorisOptions)

.setSerializer(MongoJsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build())

.build();

env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MONGO Source")

.sinkTo(sink).setParallelism(1);

String jobName = "【waybill_" + tid + "】sync_from_mongo_to_doris";

env.execute(jobName);

5、测试(支持全量增量的增删改数据同步)

添加修改删除Mongo数据

//db.getCollection("waybill_515").insert({"_id":"zzq-test122345679129","userId":NumberInt(1),"waybillCode":"SF11122324"})

//db.getCollection("waybill_515").find({})

//db.getCollection("waybill_515").update({ "_id": "zzq-test122345679129" }, { $set: { "userId": NumberInt(3), "waybillCode":"JD11122354"} })

//db.getCollection("waybill_515").remove({ "_id": "zzq-test122345679128" })

//db.getCollection("waybill_515").remove({})

//db.getCollection("waybill_515").remove({})

Doris数据

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: