kettle向redis同步数据

网上kettle向redis同步数据的完整案例不是很多,本文将以案例形式对整个过程进行详细讲解。

一、案例描述

本文以最简单的案例描述,大家在应用过程中可根据实际情况进行调整。现有学生表和成绩表。如何将表中的数据按照如下要求同步至redis? 1、将学生表的数据同步至redis,学生表的id为redis的key 2、按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。

{

"学生详细信息":[],

"成绩信息":[],

"学生id":"",

"学生姓名":""

}

二、数据准备

1、student表

SET NAMES utf8mb4;

SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------

-- Table structure for student

-- ----------------------------

DROP TABLE IF EXISTS `student`;

CREATE TABLE `student` (

`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,

`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`xh` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

PRIMARY KEY (`id`) USING BTREE

) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------

-- Records of student

-- ----------------------------

INSERT INTO `student` VALUES ('1', '张三', '001');

INSERT INTO `student` VALUES ('2', '李四', '002');

INSERT INTO `student` VALUES ('3', '王五', '003');

SET FOREIGN_KEY_CHECKS = 1;

2、grade表

SET NAMES utf8mb4;

SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------

-- Table structure for grade

-- ----------------------------

DROP TABLE IF EXISTS `grade`;

CREATE TABLE `grade` (

`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,

`classname` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`score` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

`stuid` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,

PRIMARY KEY (`id`) USING BTREE

) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------

-- Records of grade

-- ----------------------------

INSERT INTO `grade` VALUES ('1', '语文', '90', '1');

INSERT INTO `grade` VALUES ('2', '数学', '100', '1');

INSERT INTO `grade` VALUES ('3', '英语', '80', '1');

INSERT INTO `grade` VALUES ('4', '语文', '85', '2');

INSERT INTO `grade` VALUES ('5', '数学', '100', '2');

INSERT INTO `grade` VALUES ('6', '英语', '95', '2');

INSERT INTO `grade` VALUES ('7', '语文', '98', '3');

INSERT INTO `grade` VALUES ('8', '数学', '87', '3');

INSERT INTO `grade` VALUES ('9', '英语', '65', '3');

SET FOREIGN_KEY_CHECKS = 1;

三、工具类准备

1、启动redis 2、使用redis管理工具进行连接

链接:https://pan.baidu.com/s/1a7a2GBV9SGThr0efx8EFSA

提取码:e8wj

3、下载kettle中连接redis的相关jar包(jedis-3.1.0.jar和fastjson-1.2.47.jar)

链接:https://pan.baidu.com/s/1q1jymAEo9mI7GDHNrGuRyw

提取码:4kjw

下载后将jar文件放在kettle根目录下的lib文件中。切记要重启,否则不生效

四、 案例实现

1、案例一:将学生表的数据同步至redis,学生表的id为redis的key

(1)kettle任务 作业: 为了后续方便,将redis相关设置提出。 设置抽取变量 (2)转换内容 转换: 表输入: JSON output:

Java 代码-向redis写数据 可进行设置redis连接、向redis写数据以及设置过期时间等。 java代码如下:

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.Pipeline;

private Jedis jedis=null;

private JedisPool pool=null;

Pipeline pipe = null;

int cache_size=10000; // 批量提交大小

int cur_size=0; // 当前数据缓存量

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

if (first) {

first = false;

String redis_ip = getVariable("REDIS_IP", "");

String redis_port = getVariable("REDIS_PORT", "");

String redis_password = getVariable("REDIS_PWD", "");

int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));

cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));

logBasic(redis_ip+":"+redis_port);

logBasic("redis_password:"+redis_password);

// 连接池方式

JedisPoolConfig config = new JedisPoolConfig();

config.setMaxIdle(8);

config.setMaxTotal(18);

pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);

jedis = pool.getResource();

jedis.select(redis_db);// 切换数据库

pipe = jedis.pipelined(); // 创建pipeline 对象

logBasic("Server is running: " + jedis.ping());

}

Object[] r = getRow();

if (r == null) {

setOutputDone();

pipe.sync();

jedis.close();

pool.close();

return false;

}

r = createOutputRow(r, data.outputRowMeta.size());

String key = get(Fields.In, "id").getString(r);

String value = get(Fields.In, "JsonData").getString(r);

logDebug(key +"***"+ "\t" + value);

// 写入redis

pipe.set(key, value);

//设置redis过期时间:单位为秒

pipe.expire(key, 3600);

cur_size++;

if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交

pipe.sync(); // 同步

cur_size=0; // 复位

}

putRow(data.outputRowMeta, r);

return true;

}

(3)redis效果 使用json解析网址查看的结果如下。

https://www.json.cn/

{

"data":[

{

"xh":"001",

"name":"张三",

"id":"1"

}

]

}

2、案例二:按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。 (1)作业 基本与案例一类似,不在赘述 (2)获取关键参数 (3)同步业务数据到redis

多余的环节不在赘述,只展示关键点: Java 代码-向redis写数据源码: 如果不对组装结果进行处理,输出的内容如下,比较杂乱。因此我在此处写了工具类对这个结果进行了处理组装。 未处理结果:

{

"data":[

{

"tagjson":"{\"student\":[{\"xh\":\"001\",\"name\":\"张三\",\"id\":\"1\"}]}"

},

{

"tagjson":"{\"grade\":[{\"score\":\"90\",\"classname\":\"语文\",\"stuid\":\"1\",\"id\":\"1\"},{\"score\":\"100\",\"classname\":\"数学\",\"stuid\":\"1\",\"id\":\"2\"},{\"score\":\"80\",\"classname\":\"英语\",\"stuid\":\"1\",\"id\":\"3\"}]}"

}

]

}

处理过程:Java 代码-向redis写数据源码

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.Pipeline;

import unit.TestJsonUntil;

private Jedis jedis=null;

private JedisPool pool=null;

Pipeline pipe = null;

int cache_size=10000; // 批量提交大小

int cur_size=0; // 当前数据缓存量

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

if (first) {

first = false;

String redis_ip = getVariable("REDIS_IP", "");

String redis_port = getVariable("REDIS_PORT", "");

String redis_password = getVariable("REDIS_PWD", "");

int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));

cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));

logBasic(redis_ip+":"+redis_port);

logBasic("redis_password:"+redis_password);

// 连接池方式

JedisPoolConfig config = new JedisPoolConfig();

config.setMaxIdle(8);

config.setMaxTotal(18);

pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);

jedis = pool.getResource();

jedis.select(redis_db);// 切换数据库

pipe = jedis.pipelined(); // 创建pipeline 对象

logBasic("Server is running: " + jedis.ping());

}

Object[] r = getRow();

if (r == null) {

setOutputDone();

pipe.sync();

jedis.close();

pool.close();

return false;

}

r = createOutputRow(r, data.outputRowMeta.size());

String jsondata = get(Fields.In, "JsonData").getString(r);

String value = TestJsonUntil.formatTestJson(jsondata);

String key = TestJsonUntil.getID(value);

logDebug(key +"***"+ "\t" + value);

// 写入redis

pipe.set(key, value);

//设置redis过期时间:单位为秒

pipe.expire(key, 3600);

cur_size++;

if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交

pipe.sync(); // 同步

cur_size=0; // 复位

}

putRow(data.outputRowMeta, r);

return true;

}

所用到的工具类,此处是我写的java组件,打包后放入kettle根目录下的lib中即可。记得放入后需要重启。

package unit;

import com.alibaba.fastjson.JSONArray;

import com.alibaba.fastjson.JSONObject;

/**

* Created with IntelliJ IDEA.

*

* @Author: qzc

* @Date: 2023/03/13/11:51

* @Description:

*/

public class TestJsonUntil {

public static String getID(String str){

String retstr="";

JSONObject json = JSONObject.parseObject(str);

if(null !=json){

retstr= (json.getString("stuid")+"-"+json.getString("name"));

}

return retstr;

}

public static String formatTestJson(String str){

JSONObject jsonObject = new JSONObject();

jsonObject.put("stuid",null);

jsonObject.put("name",null);

jsonObject.put("student",null);

jsonObject.put("grade",null);

try {

JSONObject json = JSONObject.parseObject(str);

if(null != json){

JSONArray jsonArray =JSONArray.parseArray(json.get("data").toString()) ;

if(null != jsonArray && jsonArray.size()>0){

//定义集合

JSONArray stuArr = new JSONArray();

JSONArray grdArr = new JSONArray();

for(int i =0;i

JSONObject jsondata = jsonArray.getJSONObject(i);

if(null != jsondata && null != jsondata.getJSONObject("tagjson") ){

JSONObject tagjson = JSONObject.parseObject(jsondata.getJSONObject("tagjson").toJSONString());

if(null != tagjson ){

JSONArray curjsonArr = new JSONArray();

String currtype=null;

if(null != tagjson.get("student")){

currtype="student";

curjsonArr = JSONArray.parseArray(tagjson.get("student").toString());

}else if(null != tagjson.get("grade")){

currtype="grade";

curjsonArr = JSONArray.parseArray(tagjson.get("grade").toString());

}

if(null != curjsonArr && curjsonArr.size()>0){

for(int j =0;j

JSONObject tag = curjsonArr.getJSONObject(j);

if(null == jsonObject.get("name") && null != currtype && "student".equals(currtype)){

jsonObject.put("name",tag.get("name"));

}

if(null == jsonObject.get("stuid") && null != currtype && "student".equals(currtype)){

jsonObject.put("stuid",tag.get("id"));

}

if(null != currtype && "grade".equals(currtype)){

grdArr.add(tag);

}else if(null != currtype && "student".equals(currtype)){

stuArr.add(tag);

}

}

}

}

}

}

//将处理后的数据放入jsonobject

jsonObject.put("grade",grdArr);

jsonObject.put("student",stuArr);

}

}

}catch (Exception e){

e.printStackTrace();

}

return jsonObject.toJSONString();

}

//

// public static void main(String[] args) {

// String str ="{\"data\":[{\"tagjson\":\"{\\\"student\\\":[{\\\"xh\\\":\\\"001\\\",\\\"name\\\":\\\"张三\\\",\\\"id\\\":\\\"1\\\"}]}\"},{\"tagjson\":\"{\\\"grade\\\":[{\\\"score\\\":\\\"90\\\",\\\"classname\\\":\\\"语文\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"1\\\"},{\\\"score\\\":\\\"100\\\",\\\"classname\\\":\\\"数学\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"2\\\"},{\\\"score\\\":\\\"80\\\",\\\"classname\\\":\\\"英语\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"3\\\"}]}\"}]}";

// String strs = TestJsonUntil.formatTestJson(str);

// String str2 = TestJsonUntil.getID(strs);

// System.out.println(str2);

//

// }

}

处理完成后输出的结果如下: 这样输出显然结构很清晰。

{

"stuid":"1",

"student":[

{

"xh":"001",

"name":"张三",

"id":"1"

}

],

"grade":[

{

"score":"90",

"classname":"语文",

"stuid":"1",

"id":"1"

},

{

"score":"100",

"classname":"数学",

"stuid":"1",

"id":"2"

},

{

"score":"80",

"classname":"英语",

"stuid":"1",

"id":"3"

}

],

"name":"张三"

}

五、总结

总体来说整个过程不难,大家可以根据自己的实际需要进行调整。在正常的生成过程中业务往往及其复杂,单纯的靠kettle组件处理一些复杂的数据可能比较麻烦,这时候我觉得可以借助java组件处理业务逻辑,这样会使整个作业相对来说更加简洁方便。

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: