flink在对很多数据库sink的时候都提供了connector,比如:es、kafka等。 但我们有些场景不仅没有对应的sink,而且有时候还需要在sink的时候还有做一些查询工作。

自定义sink需要继承 RichSinkFunction,重写open、invoke、close三个方法,open方法主要实现一些公共资源的开启工作,如mongo、solr的连接客户端。invoke会在每条数据进入后调用,主要写一些数据的转化、插入、查询等具体的实际业务。

下面我给大家两个实例,一个是sink solr的,一个是sink mongo的。

1、solr示例:

这里涉及到solr的客户端操作,我是使用solrj来封装的。

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.solr.client.solrj.SolrQuery;

import org.apache.solr.client.solrj.impl.HttpSolrClient;

import org.apache.solr.client.solrj.request.QueryRequest;

import org.apache.solr.client.solrj.request.UpdateRequest;

import java.util.*;

public class SolrSink extends RichSinkFunction {

private HttpSolrClient solrClient;

private String username;

private String password;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

client();

}

@Override

public void invoke(ExempleBean value, Context context) throws Exception {

String queryString = "id:1";

try {

List exeples = new QueryRequest(

new SolrQuery(queryString).setRows(1)

).process(solrClient, "fd_table").getBeans(ExempleBean.class);

for (ExempleBean exem: exeples) {

//将javaBean转化提交数据

UpdateRequest request = new UpdateRequest();

request.setBasicAuthCredentials(username, password);

request.add(solrClient.getBinder().toSolrInputDocument(exem));

request.commit(solrClient, "fd_table");

System.out.println("执行完成");

}

} catch (Exception e) {

if (null != solrClient) {

solrClient.close();

solrClient = null;

System.out.println("里面关闭啦!");

}

client();

}

}

@Override

public void close() throws Exception {

super.close();

try {

if (null != solrClient) {

solrClient.close();

solrClient = null;

System.out.println("关闭啦!");

}

} catch (Exception e) {

e.printStackTrace();

}

}

public void client() {

synchronized (this) {

if (null == solrClient) {

//加载了配置文件当中solr的username、password

Map keyMap = SolrClientUtil.getSolrUserPassword(

"com.medbook.en.solr.username",

"com.medbook.en.solr.password");

username = keyMap.get("username");

password = keyMap.get("password");

//加载solr连接的url

Properties prop = ConfigUtil.getProperties();

String solrUrl = prop.get("com.medbook.en.solr.url").toString();

solrClient = new HttpSolrClient.Builder(solrUrl)

//连接超过时间没有跑完就会退出

.withConnectionTimeout(360000)

.withSocketTimeout(360000)

.build();

System.out.println("创建了连接");

}

}

}

}

mongo示例: 这个查询了mongo中的数据,转化长对应的javaBean,更新了传入的javaBean以后,转化成document,存入mongodb。

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.medbook.Bo.ExampleBean;

import com.medbook.utils.ConfigUtil;

import com.mongodb.MongoClient;

import com.mongodb.MongoClientURI;

import com.mongodb.client.MongoCollection;

import com.mongodb.client.MongoDatabase;

import com.mongodb.client.model.Filters;

import com.mongodb.client.model.UpdateOptions;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.bson.Document;

import org.bson.conversions.Bson;

import java.util.*;

public class MongoSink extends RichSinkFunction {

private MongoClient mongoClient;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

client();

}

@Override

public void invoke(ExampleBean bean, Context context) throws Exception {

try {

MongoDatabase db = mongoClient.getDatabase("example_data");

MongoCollection tableColl = db.getCollection("fd_table");

MongoCollection relationColl = db.getCollection("fd_relation");

//让gson转化的json字符串的时间按照指定的格式

Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

Bson jourQuery = Filters.eq("id:2");

Document doc = relationColl.find(jourQuery).first();

//将查出来mongo中的document转化为javaBean

RelationBean relationBean = gson.fromJson(doc.toJson(), RelationBean.class);

//将bean关联加入查出的某个字段

bean.setName(relationBean.getName());

// 将javabean转化成mongo的document,先使用gson转化成Json,然后解析成Document

String json = gson.toJson(bean);

Document document = Document.parse(json);

//将document更新到mongo的fd_table表中

Bson query = Filters.eq("id", bean.getId());

tableColl.replaceOne(query, document, new UpdateOptions().upsert(true));

System.out.println("执行完成!");

} catch (Exception e) {

if (null != mongoClient) {

mongoClient.close();

mongoClient = null;

System.out.println("里面关闭啦!");

}

client();

e.printStackTrace();

}

}

@Override

public void close() throws Exception {

super.close();

try {

if (null != mongoClient) {

mongoClient.close();

mongoClient = null;

System.out.println("关闭啦!");

}

} catch (Exception e) {

// e.printStackTrace();

}

}

public void client() {

synchronized (this) {

if (null == mongoClient) {

Properties prop = ConfigUtil.getProperties();

//获取配置文件中的mongo的URI

MongoClientURI mongoClientURI = new MongoClientURI(prop.getProperty("com.medbook.mongo.foreign.uri"));

mongoClient = new MongoClient(mongoClientURI);

}

}

}

}

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: