Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

pom.xml加入依赖

org.apache.doris

flink-doris-connector-1.13_2.12

1.0.3

创建测试库测试表

-- 切测试库

use test_db;

-- 创建测试表flinktest

CREATE TABLE flinktest

(

siteid INT DEFAULT '10',

citycode SMALLINT,

username VARCHAR(32) DEFAULT '',

pv BIGINT SUM DEFAULT '0'

)

AGGREGATE KEY(siteid, citycode, username)

DISTRIBUTED BY HASH(siteid) BUCKETS 10

PROPERTIES("replication_num" = "1");

-- 插入样例数据

insert into flinktest values

(1,1,'jim',2),

(2,1,'grace',2),

(3,2,'tom',2),

(4,3,'bush',3),

(5,3,'helen',3);

-- 查看表数据情况

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 1 | 1 | jim | 2 |

| 5 | 3 | helen | 3 |

| 4 | 3 | bush | 3 |

| 3 | 2 | tom | 2 |

| 2 | 1 | grace | 2 |

+--------+----------+----------+------+

Doris 和 Flink 列类型映射关系

Doris TypeFlink TypeNULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGDECIMALV2DECIMALTIMEDOUBLEHLLUnsupported datatype

2. Flink-DataStream读Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisStreamOptions;

import org.apache.doris.flink.datastream.DorisSourceFunction;

import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class Flink_stream_read_doris {

public static void main(String[] args) {

Configuration conf = new Configuration();

conf.setInteger("rest.port", 2000);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

env.setParallelism(1);

Properties props = new Properties();

props.setProperty("fenodes", "hdt-dmcp-ops01:8130");

props.setProperty("username", "root");

props.setProperty("password", "123456");

props.setProperty("table.identifier", "test_db.flinktest");

env

.addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))

.print();

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

}

/*

代码控制台输出:

[4, 3, bush, 3]

[2, 1, grace, 2]

[1, 1, jim, 2]

[5, 3, helen, 3]

[3, 2, tom, 2]

*/

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

DataStreamSQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

package com.zenitera.bigdata.doris;

import org.apache.doris.flink.cfg.DorisExecutionOptions;

import org.apache.doris.flink.cfg.DorisOptions;

import org.apache.doris.flink.cfg.DorisSink;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**

* 使用 Flink 将 JSON 数据 写到Doris数据库

*/

public class Flink_stream_write_doris_json {

public static void main(String[] args) {

Configuration conf = new Configuration();

conf.setInteger("rest.port", 2000);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

env.setParallelism(1);

Properties pro = new Properties();

pro.setProperty("format", "json");

pro.setProperty("strip_outer_array", "true");

env

.fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}")

.addSink(DorisSink.sink(

new DorisExecutionOptions.Builder()

.setBatchIntervalMs(2000L)

.setEnableDelete(false)

.setMaxRetries(3)

.setStreamLoadProp(pro)

.build(),

new DorisOptions.Builder()

.setFenodes("hdt-dmcp-ops01:8130")

.setUsername("root")

.setPassword("123456")

.setTableIdentifier("test_db.flinktest")

.build())

);

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

}

/*

代码执行前: 5 rows

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 1 | 1 | jim | 2 |

| 5 | 3 | helen | 3 |

| 4 | 3 | bush | 3 |

| 3 | 2 | tom | 2 |

| 2 | 1 | grace | 2 |

+--------+----------+----------+------+

代码执行后: 6 rows

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 2 | 1 | grace | 2 |

| 3 | 2 | tom | 2 |

| 5 | 3 | helen | 3 |

| 1 | 1 | jim | 2 |

| 10 | 1001 | ww | 100 |

| 4 | 3 | bush | 3 |

+--------+----------+----------+------+

*/

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

package com.zenitera.bigdata.doris;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import org.apache.doris.flink.cfg.DorisExecutionOptions;

import org.apache.doris.flink.cfg.DorisOptions;

import org.apache.doris.flink.cfg.DorisSink;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.data.GenericRowData;

import org.apache.flink.table.data.StringData;

import org.apache.flink.table.types.logical.*;

public class Flink_stream_write_doris_rowdata {

public static void main(String[] args) {

Configuration conf = new Configuration();

conf.setInteger("rest.port", 2000);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

env.setParallelism(1);

LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};

String[] fields = {"siteid", "citycode", "username", "pv"};

env

.fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}")

.map(json -> {

JSONObject obj = JSON.parseObject(json);

GenericRowData rowData = new GenericRowData(4);

rowData.setField(0, obj.getIntValue("siteid"));

rowData.setField(1, obj.getShortValue("citycode"));

rowData.setField(2, StringData.fromString(obj.getString("username")));

rowData.setField(3, obj.getLongValue("pv"));

return rowData;

})

.addSink(DorisSink.sink(

fields,

types,

new DorisExecutionOptions.Builder()

.setBatchIntervalMs(2000L)

.setEnableDelete(false)

.setMaxRetries(3)

.build(),

new DorisOptions.Builder()

.setFenodes("hdt-dmcp-ops01:8130")

.setUsername("root")

.setPassword("123456")

.setTableIdentifier("test_db.flinktest")

.build())

);

try {

env.execute();

} catch (Exception e) {

e.printStackTrace();

}

}

}

/*

代码执行前: 6 rows

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 2 | 1 | grace | 2 |

| 3 | 2 | tom | 2 |

| 5 | 3 | helen | 3 |

| 1 | 1 | jim | 2 |

| 10 | 1001 | ww | 100 |

| 4 | 3 | bush | 3 |

+--------+----------+----------+------+

代码执行后: 7 rows

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 1 | 1 | jim | 2 |

| 2 | 1 | grace | 2 |

| 3 | 2 | tom | 2 |

| 5 | 3 | helen | 3 |

| 10 | 1001 | ww | 100 |

| 100 | 1002 | wang | 100 |

| 4 | 3 | bush | 3 |

+--------+----------+----------+------+

*/

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;

truncate table flinktest;

insert into flinktest values

(1,1,'aaa',1),

(2,2,'bbb',2),

(3,3,'ccc',3);

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 2 | 2 | bbb | 2 |

| 1 | 1 | aaa | 1 |

| 3 | 3 | ccc | 3 |

+--------+----------+----------+------+

3 rows in set (0.01 sec)

Flink-SQL代码示例:

package com.zenitera.bigdata.doris;

import lombok.AllArgsConstructor;

import lombok.Data;

import lombok.NoArgsConstructor;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris {

public static void main(String[] args) {

Configuration conf = new Configuration();

conf.setInteger("rest.port", 2000);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql("create table flink_0518(" +

" siteid int, " +

" citycode int, " +

" username string, " +

" pv bigint " +

")with(" +

" 'connector' = 'doris', " +

" 'fenodes' = 'hdt-dmcp-ops01:8130', " +

" 'table.identifier' = 'test_db.flinktest', " +

" 'username' = 'root', " +

" 'password' = '123456' " +

")");

tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");

}

@Data

@NoArgsConstructor

@AllArgsConstructor

public static class Flink_0518 {

private Integer siteid;

private Integer citycode;

private String username;

private Long pv;

}

}

执行代码,执行完成后查看Doris对应表数据进行验证:

select * from flinktest;

+--------+----------+----------+------+

| siteid | citycode | username | pv |

+--------+----------+----------+------+

| 3 | 3 | ccc | 3 |

| 2 | 2 | bbb | 2 |

| 1 | 1 | aaa | 1 |

| 4 | 4 | wangting | 4 |

+--------+----------+----------+------+

4 rows in set (0.01 sec)

3.4 Flink-SQL 方式读Doris

package com.zenitera.bigdata.doris;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink_SQL_doris_read {

public static void main(String[] args) {

Configuration conf = new Configuration();

conf.setInteger("rest.port", 2000);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.executeSql("create table flink_0520(" +

" siteid int, " +

" citycode SMALLINT, " +

" username string, " +

" pv bigint " +

")with(" +

" 'connector' = 'doris', " +

" 'fenodes' = 'hdt-dmcp-ops01:8130', " +

" 'table.identifier' = 'test_db.flinktest', " +

" 'username' = 'root', " +

" 'password' = '123456' " +

")");

tEnv.sqlQuery("select * from flink_0520").execute().print();

}

}

/*

控制台输出信息:

+----+-------------+----------+---------------+---------+

| op | siteid | citycode | username | pv |

+----+-------------+----------+---------------+---------+

| +I | 1 | 1 | aaa | 1 |

| +I | 3 | 3 | ccc | 3 |

| +I | 2 | 2 | bbb | 2 |

| +I | 4 | 4 | wangting | 4 |

+----+-------------+----------+---------------+---------+

4 rows in set

*/

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: