作者:陶运道

  目录

      第一部分 纯DDL编程模式

      第二部分 table api编程模式

                                      第一部分 纯DDL编程模式

DDL形式实现kafka->Flink->Hbase,具体的流程,流程如下:

Flink将kafka主题user_behavior内容,通过Flink SQL Client,存入hbase表venn中。

一、开发环境

组件        版本

Flink(HA) 1.17.2

Zookeeper       3.4.5

Hadoop   3.2.3

Hbase(HA)       2.4.15

Kafka(HA)        3.2.1

本次用到包flink 的lib目录中

Sql连接器下载地址

本次解决问题是用了flink-sql-connector-hbase-2.2-1.17.2.jar

Flink1.17.有两个版本连接器   1.4 和2.2 ,本次采用2.2、

下载网站及说明

 HBase | Apache Flink

n order to use the HBase connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

HBase version Maven dependency SQL Client JAR 1.4.x   org.apache.flink   flink-connector-hbase-1.4   1.17.2 Download 2.2.x   org.apache.flink   flink-connector-hbase-2.2   1.17.2 Downloa

二、实践用到操作

1.hbase命令

命令行                  作用

hbase shell                进入命令行客户端

create 'venn','cf'            新建表格venn,其中cf是列簇

scan 'venn',{LIMIT=>1}      查看新建的表格中的数据内容

2.kafka命令

kafka-topics.sh --list -bootstrap-server master:9092         查看topic

kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior

kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic user_behavior

kafka-topics.sh --bootstrap-server master:9092 --delete --topic user_behavior  

三、实验步骤

1.产生主题消息

注意:(1)数据示例如下(传一条按下回车

     (2)格式为json 所有字段值均加””

kafka-console-producer.sh --bootstrap-server master:9092 --topic user_behavior

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

2. 创建hbase表

#hbase shell   进行hbase命令行

hbase(main):014:0> create 'venn','cf'       创建表,有一个列族cf

3. 启动Flink SQL Client将kafka主题写入hbase

(1)加入连接器到flink的lib目录中,并启动flink集群

(2)启动有关服务

#start-all.sh  //启动hadoop集群

#start-cluster.sh     //启动flink集群

# start-hbase.sh     启动hbase服务

注意附带要启动Flink集群,本实验室standalone模式

# 启动kafka服务

#sql-client.sh   启动flink-sql窗口

(3)读 kafka(json)消息 并写入表sourceTable

Flink-sql>CREATE TABLE user_log(

    user_id VARCHAR,

    item_id VARCHAR,

    category_id VARCHAR,

    behavior VARCHAR,

    ts TIMESTAMP(3)

) WITH (  'connector' = 'kafka',

  'topic' = 'user_behavior',

  'properties.bootstrap.servers' = 'master:9092',

  'properties.group.id' = 'FlinkConsumer',

  'scan.startup.mode' = 'earliest-offset',

  'format' = 'json',

  'json.fail-on-missing-field' = 'false',

  'json.ignore-parse-errors' = 'true'

);

Flink SQL> select * from user_log;

注: flink与kafka连接所需要连接器

(4)创建表sinkTable 用于连接hbase表

CREATE TABLE user_log_sink (

    user_id VARCHAR,

    cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))

)WITH (

 'connector' = 'hbase-2.2',

 'table-name' = 'venn',

 'zookeeper.quorum' = 'master:2181'

);

注  意:

以下格式是hbase连接器老版本格式。在flink1.17.2版本上练习时为此错误花费好长时间

CREATE TABLE user_log_sink (

    user_id VARCHAR,

    cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))

) WITH (

    'connector.type' = 'hbase',

    'connector.version' = ' hbase-2.2', -- hbase vesion

    'connector.table-name' = 'venn',                  -- hbase table name

    'connector.zookeeper.quorum' = 'master:2181',       -- zookeeper quorum

    'connector.zookeeper.znode.parent' = '/hbase'    -- hbase znode in zookeeper

'connector.write.buffer-flush.max-size' = '10mb', -- max flush size

    'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows

    'connector.write.buffer-flush.interval' = '2s'    -- max flush interval

);

(5)提交任务,将user_log表内容插入到user_log_sink,同时存入hbase

INSERT INTO user_log_sink

SELECT user_id,

  ROW(item_id, category_id, behavior, ts ) as cf

FROM user_log;

(6)查询出现问题

  查询插入结果要利用 hbase shell查询,不能利用语句select * from user_log_sink;查询

注意查询是本次插入的记录,而不是hbase表记录。这一点要清楚。

错误1:对于以上版本,则出现以下错误,如下:

 [ERROR] Could not execute SQL statement. Reason:

org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in

the classpath.

错误2:出现以下错误

ava.lang.NoSuchMethodError: org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily(Lorg/apache/flink/table/types/logical/LogicalType;Lorg/apache/flink/table/types/logical/LogicalTypeFamily;)Z

是版本号错误 flink-sql-connector-hbase-2.2-1.17.2.jar     1.17.2与flink版本号不对

结束语

  (1)主题消息按行组织 格式:{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26 01:00:00"}

{"字段1": "值1", "字段2":"值2", …… , "字段n": "值n"}

主题字段与hbase表中键、列名相对应,否则无法正确写入表格

(2)令人头痛的是 有两点:是flink与hbase连接器,关键版本对应关系要试。

 Flink1.17.2 对应flink-sql-connector-hbase-2.2-1.17.2.jar 

(3)网上案例,很少有做出来的。原因就在于版本不同,代码也不近相同,但不失参考价值。唯一方法就是多找几份参考。

(4)对于一大题,可拆成若干步,如上面可拆成连接kafka,连接hbase,插入三步,缩小问题范围。

第二部分  table api编程模式

一、代码

import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.bridge.scala.StreamTableEnvironmentobject  scala_1{    def main(args: Array[String]): Unit = {       val env = StreamExecutionEnvironment.getExecutionEnvironment       //默认流时间方式       env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)      //构建StreamTableEnvironment       val tenv = StreamTableEnvironment.create(env)       val table_sql =          """CREATE TABLE st (            |`user_id` VARCHAR,            |`item_id` VARCHAR,            |`category_id` VARCHAR,            |`behavior` VARCHAR,            |`ts` TIMESTAMP(3))            |WITH ('connector' = 'kafka',            |'topic' = 'user_behavior',            |'properties.bootstrap.servers' = 'master:9092',            |'properties.group.id' = 'testGroup',            |'scan.startup.mode' = 'earliest-offset',            |'format' = 'json')""".stripMargin            val f=tenv.executeSql(table_sql)

 tenv.toDataStream(tenv.sqlQuery("select * from st")) //语句不能少,相当于刷新,否则无法为下沉表sk提供流数据          val f2=tenv.executeSql(         """          CREATE TABLE sk (           user_id VARCHAR,           cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))       )WITH (        'connector' = 'hbase-2.2',        'table-name' = 'venn',        'zookeeper.quorum' = 'master:2181'        ) """.stripMargin)        tenv.executeSql(          """            |insert into sk            |select user_id,ROW(item_id,category_id,behavior,ts) as cf from st           """.stripMargin)       env.execute()    } }

二、 pom.xlm

    4.0.0     org.example     FlinkTutorial     1.0-SNAPSHOT             8         8         UTF-8         2.12         3.2.3         1.17.1         3.2.1         1.1.5         5.1.47         5.1.47         3.9                             com.esotericsoftware             kryo-shaded             4.0.2                                     org.scala-lang             scala-reflect             ${scala.version}.12                             org.scala-lang             scala-compiler             ${scala.version}.12                             org.scala-lang             scala-library             ${scala.version}.12                                     org.apache.kafka             kafka_${scala.version}             ${kafka.version}                                     org.apache.flink             flink-runtime-web             ${flink.version}                             org.apache.flink             flink-clients             ${flink.version}                             org.apache.flink             flink-streaming-scala_${scala.version}             ${flink.version}                             org.apache.flink             flink-sql-connector-kafka             ${flink.version}                             org.apache.flink             flink-table-common             1.17.2        

                    org.apache.flink             flink-table-api-scala-bridge_2.12             1.17.2                             org.apache.flink             flink-table-planner_2.12             1.17.2                             org.apache.flink             flink-json             ${flink.version}                             org.apache.flink             flink-connector-hive_${scala.version}                         ${flink.version}                                     org.apache.flink             flink-connector-jdbc_2.12             1.14.0                                     org.apache.hadoop             hadoop-client             ${hdfs.version}                             org.apache.hadoop             hadoop-auth             ${hdfs.version}        

                    org.apache.flink             flink-sql-connector-hbase-2.2             1.17.2                             org.apache.flink             flink-connector-hbase-2.2             1.17.2            

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: