kettle向redis同步数据
网上kettle向redis同步数据的完整案例不是很多,本文将以案例形式对整个过程进行详细讲解。
一、案例描述
本文以最简单的案例描述,大家在应用过程中可根据实际情况进行调整。现有学生表和成绩表。如何将表中的数据按照如下要求同步至redis? 1、将学生表的数据同步至redis,学生表的id为redis的key 2、按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。
{
"学生详细信息":[],
"成绩信息":[],
"学生id":"",
"学生姓名":""
}
二、数据准备
1、student表
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`xh` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES ('1', '张三', '001');
INSERT INTO `student` VALUES ('2', '李四', '002');
INSERT INTO `student` VALUES ('3', '王五', '003');
SET FOREIGN_KEY_CHECKS = 1;
2、grade表
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for grade
-- ----------------------------
DROP TABLE IF EXISTS `grade`;
CREATE TABLE `grade` (
`id` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`classname` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`stuid` varchar(11) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of grade
-- ----------------------------
INSERT INTO `grade` VALUES ('1', '语文', '90', '1');
INSERT INTO `grade` VALUES ('2', '数学', '100', '1');
INSERT INTO `grade` VALUES ('3', '英语', '80', '1');
INSERT INTO `grade` VALUES ('4', '语文', '85', '2');
INSERT INTO `grade` VALUES ('5', '数学', '100', '2');
INSERT INTO `grade` VALUES ('6', '英语', '95', '2');
INSERT INTO `grade` VALUES ('7', '语文', '98', '3');
INSERT INTO `grade` VALUES ('8', '数学', '87', '3');
INSERT INTO `grade` VALUES ('9', '英语', '65', '3');
SET FOREIGN_KEY_CHECKS = 1;
三、工具类准备
1、启动redis 2、使用redis管理工具进行连接
链接:https://pan.baidu.com/s/1a7a2GBV9SGThr0efx8EFSA
提取码:e8wj
3、下载kettle中连接redis的相关jar包(jedis-3.1.0.jar和fastjson-1.2.47.jar)
链接:https://pan.baidu.com/s/1q1jymAEo9mI7GDHNrGuRyw
提取码:4kjw
下载后将jar文件放在kettle根目录下的lib文件中。切记要重启,否则不生效
四、 案例实现
1、案例一:将学生表的数据同步至redis,学生表的id为redis的key
(1)kettle任务 作业: 为了后续方便,将redis相关设置提出。 设置抽取变量 (2)转换内容 转换: 表输入: JSON output:
Java 代码-向redis写数据 可进行设置redis连接、向redis写数据以及设置过期时间等。 java代码如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
String redis_ip = getVariable("REDIS_IP", "");
String redis_port = getVariable("REDIS_PORT", "");
String redis_password = getVariable("REDIS_PWD", "");
int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));
cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
logBasic(redis_ip+":"+redis_port);
logBasic("redis_password:"+redis_password);
// 连接池方式
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
jedis = pool.getResource();
jedis.select(redis_db);// 切换数据库
pipe = jedis.pipelined(); // 创建pipeline 对象
logBasic("Server is running: " + jedis.ping());
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
pipe.sync();
jedis.close();
pool.close();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
String key = get(Fields.In, "id").getString(r);
String value = get(Fields.In, "JsonData").getString(r);
logDebug(key +"***"+ "\t" + value);
// 写入redis
pipe.set(key, value);
//设置redis过期时间:单位为秒
pipe.expire(key, 3600);
cur_size++;
if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
pipe.sync(); // 同步
cur_size=0; // 复位
}
putRow(data.outputRowMeta, r);
return true;
}
(3)redis效果 使用json解析网址查看的结果如下。
https://www.json.cn/
{
"data":[
{
"xh":"001",
"name":"张三",
"id":"1"
}
]
}
2、案例二:按照如下结构同步每个学生的信息以及所关联的成绩。redis的key使用学生id_学生姓名形式。 (1)作业 基本与案例一类似,不在赘述 (2)获取关键参数 (3)同步业务数据到redis
多余的环节不在赘述,只展示关键点: Java 代码-向redis写数据源码: 如果不对组装结果进行处理,输出的内容如下,比较杂乱。因此我在此处写了工具类对这个结果进行了处理组装。 未处理结果:
{
"data":[
{
"tagjson":"{\"student\":[{\"xh\":\"001\",\"name\":\"张三\",\"id\":\"1\"}]}"
},
{
"tagjson":"{\"grade\":[{\"score\":\"90\",\"classname\":\"语文\",\"stuid\":\"1\",\"id\":\"1\"},{\"score\":\"100\",\"classname\":\"数学\",\"stuid\":\"1\",\"id\":\"2\"},{\"score\":\"80\",\"classname\":\"英语\",\"stuid\":\"1\",\"id\":\"3\"}]}"
}
]
}
处理过程:Java 代码-向redis写数据源码
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Pipeline;
import unit.TestJsonUntil;
private Jedis jedis=null;
private JedisPool pool=null;
Pipeline pipe = null;
int cache_size=10000; // 批量提交大小
int cur_size=0; // 当前数据缓存量
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
String redis_ip = getVariable("REDIS_IP", "");
String redis_port = getVariable("REDIS_PORT", "");
String redis_password = getVariable("REDIS_PWD", "");
int redis_db = Integer.parseInt(getVariable("REDIS_DB", ""));
cache_size = Integer.valueOf(getVariable("redis.cache_size", "10000"));
logBasic(redis_ip+":"+redis_port);
logBasic("redis_password:"+redis_password);
// 连接池方式
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxIdle(8);
config.setMaxTotal(18);
pool = new JedisPool(config, redis_ip, Integer.valueOf(redis_port), 2000, redis_password);
jedis = pool.getResource();
jedis.select(redis_db);// 切换数据库
pipe = jedis.pipelined(); // 创建pipeline 对象
logBasic("Server is running: " + jedis.ping());
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
pipe.sync();
jedis.close();
pool.close();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
String jsondata = get(Fields.In, "JsonData").getString(r);
String value = TestJsonUntil.formatTestJson(jsondata);
String key = TestJsonUntil.getID(value);
logDebug(key +"***"+ "\t" + value);
// 写入redis
pipe.set(key, value);
//设置redis过期时间:单位为秒
pipe.expire(key, 3600);
cur_size++;
if (cur_size % cache_size == 0 && cur_size > 0) {// 当达到缓存最大值时提交
pipe.sync(); // 同步
cur_size=0; // 复位
}
putRow(data.outputRowMeta, r);
return true;
}
所用到的工具类,此处是我写的java组件,打包后放入kettle根目录下的lib中即可。记得放入后需要重启。
package unit;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
/**
* Created with IntelliJ IDEA.
*
* @Author: qzc
* @Date: 2023/03/13/11:51
* @Description:
*/
public class TestJsonUntil {
public static String getID(String str){
String retstr="";
JSONObject json = JSONObject.parseObject(str);
if(null !=json){
retstr= (json.getString("stuid")+"-"+json.getString("name"));
}
return retstr;
}
public static String formatTestJson(String str){
JSONObject jsonObject = new JSONObject();
jsonObject.put("stuid",null);
jsonObject.put("name",null);
jsonObject.put("student",null);
jsonObject.put("grade",null);
try {
JSONObject json = JSONObject.parseObject(str);
if(null != json){
JSONArray jsonArray =JSONArray.parseArray(json.get("data").toString()) ;
if(null != jsonArray && jsonArray.size()>0){
//定义集合
JSONArray stuArr = new JSONArray();
JSONArray grdArr = new JSONArray();
for(int i =0;i JSONObject jsondata = jsonArray.getJSONObject(i); if(null != jsondata && null != jsondata.getJSONObject("tagjson") ){ JSONObject tagjson = JSONObject.parseObject(jsondata.getJSONObject("tagjson").toJSONString()); if(null != tagjson ){ JSONArray curjsonArr = new JSONArray(); String currtype=null; if(null != tagjson.get("student")){ currtype="student"; curjsonArr = JSONArray.parseArray(tagjson.get("student").toString()); }else if(null != tagjson.get("grade")){ currtype="grade"; curjsonArr = JSONArray.parseArray(tagjson.get("grade").toString()); } if(null != curjsonArr && curjsonArr.size()>0){ for(int j =0;j JSONObject tag = curjsonArr.getJSONObject(j); if(null == jsonObject.get("name") && null != currtype && "student".equals(currtype)){ jsonObject.put("name",tag.get("name")); } if(null == jsonObject.get("stuid") && null != currtype && "student".equals(currtype)){ jsonObject.put("stuid",tag.get("id")); } if(null != currtype && "grade".equals(currtype)){ grdArr.add(tag); }else if(null != currtype && "student".equals(currtype)){ stuArr.add(tag); } } } } } } //将处理后的数据放入jsonobject jsonObject.put("grade",grdArr); jsonObject.put("student",stuArr); } } }catch (Exception e){ e.printStackTrace(); } return jsonObject.toJSONString(); } // // public static void main(String[] args) { // String str ="{\"data\":[{\"tagjson\":\"{\\\"student\\\":[{\\\"xh\\\":\\\"001\\\",\\\"name\\\":\\\"张三\\\",\\\"id\\\":\\\"1\\\"}]}\"},{\"tagjson\":\"{\\\"grade\\\":[{\\\"score\\\":\\\"90\\\",\\\"classname\\\":\\\"语文\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"1\\\"},{\\\"score\\\":\\\"100\\\",\\\"classname\\\":\\\"数学\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"2\\\"},{\\\"score\\\":\\\"80\\\",\\\"classname\\\":\\\"英语\\\",\\\"stuid\\\":\\\"1\\\",\\\"id\\\":\\\"3\\\"}]}\"}]}"; // String strs = TestJsonUntil.formatTestJson(str); // String str2 = TestJsonUntil.getID(strs); // System.out.println(str2); // // } } 处理完成后输出的结果如下: 这样输出显然结构很清晰。 { "stuid":"1", "student":[ { "xh":"001", "name":"张三", "id":"1" } ], "grade":[ { "score":"90", "classname":"语文", "stuid":"1", "id":"1" }, { "score":"100", "classname":"数学", "stuid":"1", "id":"2" }, { "score":"80", "classname":"英语", "stuid":"1", "id":"3" } ], "name":"张三" } 五、总结 总体来说整个过程不难,大家可以根据自己的实际需要进行调整。在正常的生成过程中业务往往及其复杂,单纯的靠kettle组件处理一些复杂的数据可能比较麻烦,这时候我觉得可以借助java组件处理业务逻辑,这样会使整个作业相对来说更加简洁方便。 精彩链接
发表评论