相关资料教程地址
flink-cdc 资料
https://ververica.github.io/flink-cdc-connectors/release-2.3/
flink connectors教程
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/
doris 教程
https://doris.apache.org/zh-CN/docs/dev/get-starting/quick-start
mongo
https://www.mongodb.com/docs/manual/changeStreams/
1、Flink环境搭建(环境搭建自己去搭建下)
使用的Flink版本是1.17.1
2、Doris环境搭建(环境搭建自己去搭建下)
使用mysql工具链接
3、同步实现原理
监听Mongo的Change Stream,将数据的变化实时同步到Doris
4、同步实现
4.1 同步脚本实现方案(需要前置准备jar包,添加jar包还得重启flink服务,执行脚本还得上服务运行,不方便,而且对添加自定义字段等自定义场景不友好)
可参考:https://ververica.github.io/flink-cdc-connectors/release-2.3/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/build-real-time-data-lake-tutorial-zh.html
4.1.1 添加同步jar包到flink/lib目录下并启动flink(添加jar需重启flink服务,前置准备所有的connector包到flink/lib下)
https://repo1.maven.org/maven2/com/ververica/ https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.17/1.4.0/flink-doris-connector-1.17-1.4.0.jar
4.1.2 编写mongoToDorisSql.sql
-- 设置间隔时间
SET 'execution.checkpointing.interval' = '5s';
-- 设置本地时区为 Asia/Shanghai
SET 'table.local-time-zone' = 'Asia/Shanghai';
-- 切换到基于磁盘的状态后端
SET 'state.backend' = 'filesystem';
SET 'state.checkpoints.dir' = 'file:///flink/checkpoints';
-- 增加内存状态后端容量大小
SET 'state.backend.memory.maxStateSize' = '1g';
-- 配置检查点清理策略
SET 'state.checkpoints.num-retained' = '2';
-- 减小作业数据并行度
SET 'parallelism.default' = '4';
-- 开启增量检查点合并小状态
SET 'state.backend.incremental' = 'true';
-- 开启异步检查点
SET 'checkpointing.mode' = 'at_least_once';
-- 切换到 RocksDBStateBackend
SET 'state.backend' = 'rocksdb';
CREATE TABLE waybill_515_form (
_id STRING,
userId INT,
waybillCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '10.10.67.209:27017',
'username' = 'root',
'password' = '123456',
'database' = 'waybillcenter_saas',
'collection' = 'waybill_515'
);
CREATE TABLE waybill_515_to (
_id STRING,
userId INT,
waybillCode STRING,
PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = '192.168.1.57:8030',
'table.identifier' = 'test.waybill_515',
'username' = 'root',
'password' = '',
'sink.enable-delete' = 'true', -- 同步删除事件
'sink.label-prefix' = 'waybill_515'
);
INSERT INTO waybill_515_to SELECT * FROM waybill_515_form f;
4.1.3、 执行脚本
./flink/bin/sql-client.sh -f mongoToDorisSql.sql
4.2 基于sql的java代码实现方案(相比于4.1维护更加方便,对自定义字段等自定义场景很友好)
可参考:https://github.com/apache/doris/blob/master/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Cdc2DorisSQLDemo.java
4.2.1 添加同步jar包到flink/lib目录下并启动flink(参考4.1.1)
4.2.2 WaybillSyncSQLJobTest.java
public class WaybillSyncSQLJobTest {
public static void main(String[] args) {
if (args.length == 0) {
throw new RuntimeException("tid不能为空,以,分割,Program Arguments中设置,例如515,516");
}
String[] tids = args[0].split(",");
for (String tid : tids) {
StreamExecutionEnvironment env = CreateEnvUtil.getStreamExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("CREATE TABLE waybill_" + tid + "_form (\n" +
"_id STRING,\n" +
"importFrom INT,\n" +
"batchId String,\n" +
"userId INT,\n" +
"PRIMARY KEY (_id) NOT ENFORCED\n" +
" ) WITH (\n" +
"'connector' = 'mongodb-cdc',\n" +
"'hosts' = '10.10.67.209:27017',\n" +
"'username' = 'root',\n" +
"'password' = '123456',\n" +
"'database' = 'waybillcenter_saas',\n" +
"'collection' = 'waybill_" + tid + "'\n" +
" );");
tEnv.executeSql("CREATE TABLE waybill_" + tid + "_to (\n" +
"_id STRING,\n" +
"importFrom INT,\n" +
"batchId String,\n" +
"userId INT,\n" +
"PRIMARY KEY (_id) NOT ENFORCED\n" +
" ) WITH (\n" +
"\t'connector' = 'doris',\n" +
"\t'fenodes' = '192.168.1.57:8030',\n" +
"\t'table.identifier' = 'test.waybill_" + tid + "',\n" +
"\t'username' = 'root',\n" +
"\t'password' = '',\n" +
"\t'sink.enable-delete' = 'true', \n" +
"\t'sink.label-prefix' = 'waybill_" + tid + "_" + UUID.randomUUID().toString() + "'\n" +
" );");
tEnv.executeSql("INSERT INTO waybill_" + tid + "_to SELECT * FROM waybill_" + tid + "_form f;");
}
}
}
class CreateEnvUtil {
/**
* 获取执行环境
*
* @return
*/
public static StreamExecutionEnvironment getStreamExecutionEnvironment() {
//使用TimeZone类设置作业运行时的默认时区:
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//TODO 2.检查点相关的设置
//开启检查点
env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);
//设置检查点的超时时间
env.getCheckpointConfig().setCheckpointTimeout(120000L);
//设置job取消之后 检查点是否保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置两个检查点之间的最小时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);
//设置重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));
//设置状态后端
env.setStateBackend(new FsStateBackend("file:///flink/checkpoints"));
return env;
}
}
4.2.3 提交任务(上传jar并提交任务)
4.3 基于Stream实现(mongo数据结构层级乱,坑有点多,建议使用4.2,如果其他源数据库与目标数据库JSON可衔接同步数据,选择这种方式会更简单,无需处理字段映射)
可参考:https://github.com/apache/doris/blob/master/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/dbsync/DatabaseFullSync.java
4.3.1 添加同步jar包到flink/lib目录下并启动flink(参考4.1.1)
4.3.2 可自定义Json序列化支持mongo,需处理的场景较多,需定制化处理,如下
MongoJsonDebeziumSchemaSerializer.java 核心定制化处理的方法:extractRow(JsonNode recordRow)
public class MongoJsonDebeziumSchemaSerializer implements DorisRecordSerializer
private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class);
private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s";
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "insert"; // insert
private static final String OP_UPDATE = "update"; // update
private static final String OP_DELETE = "delete"; // delete
public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s"; //alter table tbl add cloumn aca int
private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
private final Pattern addDropDDLPattern;
private DorisOptions dorisOptions;
private ObjectMapper objectMapper = new ObjectMapper();
private String database;
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
public MongoJsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern, String sourceTableName) {
this.dorisOptions = dorisOptions;
this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern;
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.database = tableInfo[0];
this.table = tableInfo[1];
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
}
@Override
public byte[] serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = extractJsonNode(recordRoot, "operationType");
if (Objects.isNull(op)) {
//schema change ddl
schemaChange(recordRoot);
return null;
}
Map
switch (op) {
case OP_READ:
case OP_CREATE:
case OP_UPDATE:
valueMap = extractAfterRow(recordRoot);
addDeleteSign(valueMap, false);
break;
case OP_DELETE:
valueMap = extractBeforeRow(recordRoot);
addDeleteSign(valueMap, true);
break;
default:
LOG.error("parse record fail, unknown op {} in {}", op, record);
return null;
}
return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8);
}
@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
String ddl = extractDDL(recordRoot);
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
boolean doSchemaChange = checkSchemaChange(ddl);
status = doSchemaChange && execSchemaChange(ddl);
LOG.info("schema change status:{}", status);
} catch (Exception ex) {
LOG.warn("schema change error :", ex);
}
return status;
}
/**
* When cdc synchronizes multiple tables, it will capture multiple table schema changes
*/
protected boolean checkTable(JsonNode recordRoot) {
String db = extractDatabase(recordRoot);
String tbl = extractTable(recordRoot);
String dbTbl = db + "." + tbl;
return sourceTableName.equals(dbTbl);
}
private void addDeleteSign(Map
if (delete) {
valueMap.put(DORIS_DELETE_SIGN, "1");
} else {
valueMap.put(DORIS_DELETE_SIGN, "0");
}
}
private boolean checkSchemaChange(String ddl) throws IOException, IllegalArgumentException {
String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database, table);
Map
if (param.size() != 2) {
return false;
}
HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl);
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpGet);
if (!success) {
LOG.warn("schema change can not do table {}.{}", database, table);
}
return success;
}
/**
* Build param
* {
* "isDropColumn": true,
* "columnName" : "column"
* }
*/
protected Map
Map
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
params.put("isDropColumn", op.equalsIgnoreCase("DROP"));
params.put("columnName", col);
}
return params;
}
private boolean execSchemaChange(String ddl) throws IOException, IllegalArgumentException {
Map
param.put("stmt", ddl);
String requestUrl = String.format(SCHEMA_CHANGE_API, RestService.randomEndpoint(dorisOptions.getFenodes(), LOG), database);
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param)));
boolean success = handleResponse(httpPost);
return success;
}
protected String extractDatabase(JsonNode record) {
if (record.get("source").has("schema")) {
//compatible with schema
return extractJsonNode(record.get("source"), "schema");
} else {
return extractJsonNode(record.get("source"), "db");
}
}
protected String extractTable(JsonNode record) {
return extractJsonNode(record.get("source"), "table");
}
private boolean handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
} else {
LOG.error("schema change response:{}", loadResult);
}
}
} catch (Exception e) {
LOG.error("http request error,", e);
}
return false;
}
private String extractJsonNode(JsonNode record, String key) {
return record != null && record.get(key) != null ? record.get(key).asText() : null;
}
private Map
return extractRow(record.get("documentKey"));
}
private Map
return extractRow(record.get("fullDocument"));
}
private Map
String content = recordRow.textValue();
JsonNode jsonNode = objectMapper.readValue(content, JsonNode.class);
Iterator
while (fieldNames.hasNext()) {
String fieldName = fieldNames.next();
JsonNode jsonNodeChild = jsonNode.get(fieldName);
JsonNode date = jsonNodeChild.get("$date");
if (date != null) {
((ObjectNode) jsonNode).set(fieldName, date);
continue;
}
JsonNode oid = jsonNodeChild.get("$oid");
if (oid != null) {
((ObjectNode) jsonNode).set(fieldName, oid);
}
}
Map
try {
recordMap = objectMapper.convertValue(jsonNode, new TypeReference
});
} catch (Exception e) {
throw new IOException("【MongoJsonDebeziumSchemaSerializer异常" + e.getMessage() + "】" + jsonNode.toString());
}
return recordMap != null ? recordMap : new HashMap<>();
}
public String extractDDL(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);
String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, dorisOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
}
}
return null;
}
private String authHeader() {
return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8)));
}
public static MongoJsonDebeziumSchemaSerializer.Builder builder() {
return new MongoJsonDebeziumSchemaSerializer.Builder();
}
/**
* Builder for JsonDebeziumSchemaSerializer.
*/
public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
public MongoJsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
public MongoJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
}
public MongoJsonDebeziumSchemaSerializer.Builder setSourceTableName(String sourceTableName) {
this.sourceTableName = sourceTableName;
return this;
}
public MongoJsonDebeziumSchemaSerializer build() {
return new MongoJsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern, sourceTableName);
}
}
private String handleType(String type) {
if (type == null || "".equals(type)) {
return "";
}
// varchar len * 3
Pattern pattern = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(type);
if (matcher.find()) {
String len = matcher.group(1);
return String.format("varchar(%d)", Math.min(Integer.parseInt(len) * 3, 65533));
}
return type;
}
}
4.3.3 使用方式
StreamExecutionEnvironment env = CreateEnvUtil.getStreamExecutionEnvironment();
MongoDBSource
.hosts("10.10.67.209:27017")
.username("root")
.password("123456")
.databaseList("waybillcenter_saas")
.collectionList("waybillcenter_saas.waybill_" + tid)
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
Properties props = new Properties();
props.setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
props.setProperty(LoadConstants.COLUMNS_KEY, "_id");
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes("192.168.1.57:8030")
.setTableIdentifier("test.waybill_" + tid)
.setUsername("root")
.setPassword("").build();
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("waybill" + tid + "_" + UUID.randomUUID().toString())
.setStreamLoadProp(props).setDeletable(true);
DorisSink.Builder
DorisSink
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(MongoJsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build())
.build();
env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MONGO Source")
.sinkTo(sink).setParallelism(1);
String jobName = "【waybill_" + tid + "】sync_from_mongo_to_doris";
env.execute(jobName);
5、测试(支持全量增量的增删改数据同步)
添加修改删除Mongo数据
//db.getCollection("waybill_515").insert({"_id":"zzq-test122345679129","userId":NumberInt(1),"waybillCode":"SF11122324"})
//db.getCollection("waybill_515").find({})
//db.getCollection("waybill_515").update({ "_id": "zzq-test122345679129" }, { $set: { "userId": NumberInt(3), "waybillCode":"JD11122354"} })
//db.getCollection("waybill_515").remove({ "_id": "zzq-test122345679128" })
//db.getCollection("waybill_515").remove({})
//db.getCollection("waybill_515").remove({})
Doris数据
参考文章
发表评论