作者:陶运道
目录
第一部分 纯DDL编程模式
第二部分 table api编程模式
第一部分 纯DDL编程模式
DDL形式实现kafka->Flink->Hbase,具体的流程,流程如下:
Flink将kafka主题user_behavior内容,通过Flink SQL Client,存入hbase表venn中。
一、开发环境
组件 版本
Flink(HA) 1.17.2
Zookeeper 3.4.5
Hadoop 3.2.3
Hbase(HA) 2.4.15
Kafka(HA) 3.2.1
本次用到包flink 的lib目录中
Sql连接器下载地址
本次解决问题是用了flink-sql-connector-hbase-2.2-1.17.2.jar
Flink1.17.有两个版本连接器 1.4 和2.2 ,本次采用2.2、
下载网站及说明
HBase | Apache Flink
n order to use the HBase connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
HBase version Maven dependency SQL Client JAR 1.4.x
二、实践用到操作
1.hbase命令
命令行 作用
hbase shell 进入命令行客户端
create 'venn','cf' 新建表格venn,其中cf是列簇
scan 'venn',{LIMIT=>1} 查看新建的表格中的数据内容
2.kafka命令
kafka-topics.sh --list -bootstrap-server master:9092 查看topic
kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic user_behavior
kafka-topics.sh --bootstrap-server master:9092 --delete --topic user_behavior
三、实验步骤
1.产生主题消息
注意:(1)数据示例如下(传一条按下回车
(2)格式为json 所有字段值均加””
kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
2. 创建hbase表
#hbase shell 进行hbase命令行
hbase(main):014:0> create 'venn','cf' 创建表,有一个列族cf
3. 启动Flink SQL Client将kafka主题写入hbase
(1)加入连接器到flink的lib目录中,并启动flink集群
(2)启动有关服务
#start-all.sh //启动hadoop集群
#start-cluster.sh //启动flink集群
# start-hbase.sh 启动hbase服务
注意附带要启动Flink集群,本实验室standalone模式
# 启动kafka服务
#sql-client.sh 启动flink-sql窗口
(3)读 kafka(json)消息 并写入表sourceTable
Flink-sql>CREATE TABLE user_log(
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3)
) WITH ( 'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'FlinkConsumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
Flink SQL> select * from user_log;
注: flink与kafka连接所需要连接器
(4)创建表sinkTable 用于连接hbase表
CREATE TABLE user_log_sink (
user_id VARCHAR,
cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))
)WITH (
'connector' = 'hbase-2.2',
'table-name' = 'venn',
'zookeeper.quorum' = 'master:2181'
);
注 意:
以下格式是hbase连接器老版本格式。在flink1.17.2版本上练习时为此错误花费好长时间
CREATE TABLE user_log_sink (
user_id VARCHAR,
cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))
) WITH (
'connector.type' = 'hbase',
'connector.version' = ' hbase-2.2', -- hbase vesion
'connector.table-name' = 'venn', -- hbase table name
'connector.zookeeper.quorum' = 'master:2181', -- zookeeper quorum
'connector.zookeeper.znode.parent' = '/hbase' -- hbase znode in zookeeper
'connector.write.buffer-flush.max-size' = '10mb', -- max flush size
'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows
'connector.write.buffer-flush.interval' = '2s' -- max flush interval
);
(5)提交任务,将user_log表内容插入到user_log_sink,同时存入hbase
INSERT INTO user_log_sink
SELECT user_id,
ROW(item_id, category_id, behavior, ts ) as cf
FROM user_log;
(6)查询出现问题
查询插入结果要利用 hbase shell查询,不能利用语句select * from user_log_sink;查询
注意查询是本次插入的记录,而不是hbase表记录。这一点要清楚。
错误1:对于以上版本,则出现以下错误,如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
错误2:出现以下错误
ava.lang.NoSuchMethodError: org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily(Lorg/apache/flink/table/types/logical/LogicalType;Lorg/apache/flink/table/types/logical/LogicalTypeFamily;)Z
是版本号错误 flink-sql-connector-hbase-2.2-1.17.2.jar 1.17.2与flink版本号不对
结束语
(1)主题消息按行组织 格式:{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}
{"字段1": "值1", "字段2":"值2", …… , "字段n": "值n"}
主题字段与hbase表中键、列名相对应,否则无法正确写入表格
(2)令人头痛的是 有两点:是flink与hbase连接器,关键版本对应关系要试。
Flink1.17.2 对应flink-sql-connector-hbase-2.2-1.17.2.jar
(3)网上案例,很少有做出来的。原因就在于版本不同,代码也不近相同,但不失参考价值。唯一方法就是多找几份参考。
(4)对于一大题,可拆成若干步,如上面可拆成连接kafka,连接hbase,插入三步,缩小问题范围。
第二部分 table api编程模式
一、代码
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject scala_1{ def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //默认流时间方式 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //构建StreamTableEnvironment val tenv = StreamTableEnvironment.create(env) val table_sql = """CREATE TABLE st ( |`user_id` VARCHAR, |`item_id` VARCHAR, |`category_id` VARCHAR, |`behavior` VARCHAR, |`ts` TIMESTAMP(3)) |WITH ('connector' = 'kafka', |'topic' = 'user_behavior', |'properties.bootstrap.servers' = 'master:9092', |'properties.group.id' = 'testGroup', |'scan.startup.mode' = 'earliest-offset', |'format' = 'json')""".stripMargin val f=tenv.executeSql(table_sql)
tenv.toDataStream(tenv.sqlQuery("select * from st")) //语句不能少,相当于刷新,否则无法为下沉表sk提供流数据 val f2=tenv.executeSql( """ CREATE TABLE sk ( user_id VARCHAR, cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)) )WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'venn', 'zookeeper.quorum' = 'master:2181' ) """.stripMargin) tenv.executeSql( """ |insert into sk |select user_id,ROW(item_id,category_id,behavior,ts) as cf from st """.stripMargin) env.execute() } }
二、 pom.xlm
参考阅读
发表评论