main
object MyHbaseSinkTest {
def main(args: Array[String]): Unit = {
//环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 获取基础参数
*/
val bootstrapserversnew = Contant.BOOTSTRAP_SERVERS_NEW
import org.apache.flink.api.scala._
/**
* 定义kafka-source得到DataStream
*/
val topics = "vs_merging_middle_topic"
//将kafka中数据反序列化,
val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()
val properties = new Properties()
properties.put("bootstrap.servers", bootstrapserversnew)
properties.put("group.id", "flink_hbase_sink_consumer2")
properties.put("auto.offset.reset", Contant.AUTO_OFFSET_RESET_VALUE)
println(Contant.BOOTSTRAP_SERVERS_NEW)
val kafkaSinkDStream = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties))
kafkaSinkDStream.flatMap(data => {
val jsonData = JSON.parseObject(data)
val table = jsonData.getString("table")
val rowkey = jsonData.getJSONObject("data").get("id").toString
val jsonResult = jsonData.getJSONObject("data")
import scala.collection.JavaConversions._
jsonResult.keySet().map(key => {
HbaseValueBean("variable_system:" + table, rowkey, "info", key, jsonResult.get(key).toString)
})
})
.addSink(MyHbaseSinkSingle)
env.execute(this.getClass.getSimpleName)
}
}
bean
case class HbaseValueBean(
tableName : String ,
rowkey : String ,
family : String ,
column : String ,
value : String
)
HBase-Sink
object MyHbaseSinkSingle extends RichSinkFunction[HbaseValueBean] {
private var putsResult: List[Put] = collection.immutable.List[Put]()
private var hbaseConn: Connection = null
private var maxSize = 100
/**
* 开始,打开连接
*
* @param parameters
*/
override def open(parameters: Configuration): Unit = {
hbaseConn = HBaseUtils.getHbaseConnNotPoolNew
maxSize = parameters.getInteger("maxSize" , 100)
// val lastInvokeTime = System.currentTimeMillis
}
/**
* 数据处理
*
* @param value
* @param context
*/
override def invoke(value: HbaseValueBean, context: SinkFunction.Context[_]): Unit = {
if (value.tableName == "variable_system:risk_task") {
import scala.collection.JavaConversions._
/**
* rowkey与put
*/
val put = new Put(value.rowkey.getBytes())
PutUtils.setDataString(put , value.family , value.column , value.value)
putsResult = putsResult :+ put
println(s"puts-size:${putsResult.size}")
/**
* 判断输出
*/
if (putsResult.size() >= maxSize) {
val table = hbaseConn.getTable(TableName.valueOf(value.tableName))
table.put(putsResult)
println("进行sink")
println(s"table:${value.tableName} , 已经达到:${maxSize} , 已存储;")
println(s"puts:${putsResult}")
/**
* 因为Java与Scala集合转换,所以这里是没有scala的清除方法的
*/
putsResult = collection.immutable.List[Put]()
table.close()
}
}
}
/**
* 满足条件输出数据并且关闭连接
*/
override def close(): Unit = {
hbaseConn.close()
}
}
HBase-Connect
def getHbaseConnNotPoolNew: Connection = {
var conf: Configuration = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "host:port")
conf.set("hbase.zookeeper.property.clientPort", "port")
conf.set("hbase.master", "16000")
conf.set("hbase.rootdir", "dir")
val conn = ConnectionFactory.createConnection(conf)
conn
}
HBase-Utiles
object PutUtils {
def setDataString(put: Put,cf:String,col:String,data:String): Unit ={
put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(data))
}
def setDataStringGetPut(put: Put,cf:String,col:String,data:String): Put ={
put.addColumn(Bytes.toBytes(cf) , Bytes.toBytes(col) , Bytes.toBytes(if(data!= null && data != "") data else "null"))
}
}
部署
#!/bin/bash
flink run -m yarn-cluster \
-c com.xxxx.flink.MyHbaseSinkTest \
-p 8 \
/home/cdh/xxxx/2020/11/FlinkToKuduDemo/realtime_source_dw.jar
注意事项
不能够以Spark Streaming的方式来理解Fink的Source和Sink,如果使用客户端是无法针对每个分区进行连接数据处理的,使用Sink可以建立全局连接进行数据存储;由上面,全局连接,全局数据处理导致的问题就是不能够每个分区建立线程不安全的集合进行数据存储,必须使用线程安全的集合,也就是不可变的集合进行数据处理,那么使用了Scala之间的集合转换就要注意方法的使用,很多Java结合的方法Scala是没有的,所以一般的清空操作还是使用地址替换重新复制覆盖的方式来进行addSink的之前的DataStream的数据类型一定是与自定义Sink的操作类型一致的,是针对最后的数据类型进行处理存储的;
好文阅读
发表评论