一、JAVA 调用 Kettle 转换
在写 Java 程序前,先使用 Spoon 设计一下转换的过程,这里以拉取 CSDN 文章列表存入 txt 文本为例:
拉取的接口为 https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950
返回格式如下:
{
"code": 200,
"message": "success",
"traceId": "b1e8ccb0-2e39-4834-bacd-b52a260bb521",
"data": {
"list": [
{
"articleId": 130450076,
"title": "ETL工具 - Kettle 查询、连接、统计、脚本算子介绍",
"description": "连接算子一般将多个数据集通过关键字进行连接,类似 `SQL` 中的连接操作,统计算子可以提供数据的采样和统计功能,脚本算子可以通过程序代码完成一些复杂的操作",
"url": "https://xiaobichao.blog.csdn.net/article/details/130450076",
"type": 1,
"top": false,
"forcePlan": false,
"viewCount": 313,
"commentCount": 0,
"editUrl": "https://editor.csdn.net/md?articleId=130450076",
"postTime": "2023-04-30 23:12:13",
"diggCount": 1,
"formatTime": "前天 23:12",
"picList": [
"https://img-blog.csdnimg.cn/2e817e14046f4cba9663c89978198f12.png"
]
}
],
"total": 287
}
}
1.1 转换设计过程
这里 url 通过变量的形式传递进来,整体的转换设计如下:
获取变量:
REST client:
JSON input:
字段选择:
文本文件输出:
设计完后,保存 ktr 脚本:
1.2 Java 调用转换脚本
新建一个 Mavne 项目,在 pom 中引入下面依赖:
在 resources 下新建 kettle-password-encoder-plugins.xml 文件,内容如下:
Java 调用逻辑:
public class RunTrans {
public static void main(String[] args) {
try {
// 指定插件位置,注意改为你的安装目录
StepPluginType.getInstance().getPluginFolders().
add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
// 初始化 kettle 环境
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
String ktrPath = "D:/data/job/trans.ktr";
String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
// 添加变量
Map
variableMap.put("url", url);
Boolean res = runTrans(ktrPath, variableMap, null);
System.out.println("转换执行结果:" + res);
}
private static Boolean runTrans(String ktrPath, Map
try {
// 加载 ktr 文件
TransMeta transMeta = new TransMeta(ktrPath, (Repository) null);
Trans trans = new Trans(transMeta);
trans.setLogLevel(LogLevel.MINIMAL);
// 变量
if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
variableMap.forEach(trans::setVariable);
}
// 参数
if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
parameterMap.forEach((k, v) -> {
try {
trans.setParameterValue(k, v);
} catch (UnknownParamException e) {
e.printStackTrace();
}
});
}
// 监听执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
@Override
public void eventAdded(KettleLoggingEvent logs) {
System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
}
});
// 执行转换
trans.execute(new String[0]);
// 等待执行完成
trans.waitUntilFinished();
// 是否执行成功
return trans.getErrors() == 0;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
下面到输出目录查看结果:
二、JAVA 调用 Kettle 任务
任务就调用上面的转换,进行测试:
保存 kjb 文件:
Java 调用逻辑:
public class RunJob {
public static void main(String[] args) {
try {
// 指定插件位置
StepPluginType.getInstance().getPluginFolders().
add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
// 初始化 kettle 环境
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
String kjbPath = "D:/data/job/job.kjb";
String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=2&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
// 添加变量
Map
variableMap.put("url", url);
Boolean res = runJob(kjbPath, variableMap, null);
System.out.println("转换执行结果:" + res);
}
private static Boolean runJob(String kjbPath, Map
try {
JobMeta jobMeta = new JobMeta(kjbPath, null);
Job job = new Job(null, jobMeta);
job.setLogLevel(LogLevel.MINIMAL);
// 变量
if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
variableMap.forEach(job::setVariable);
}
// 参数
if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
parameterMap.forEach((k, v) -> {
try {
job.setParameterValue(k, v);
} catch (UnknownParamException e) {
e.printStackTrace();
}
});
}
// 监听执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
@Override
public void eventAdded(KettleLoggingEvent logs) {
System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
}
});
// 执行作业
job.start();
// 等待执行完成
job.waitUntilFinished();
// 是否执行成功
return job.getErrors() == 0;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}
下面到输出目录查看结果:
好文阅读
发表评论