一、JAVA 调用 Kettle 转换

在写 Java 程序前,先使用 Spoon 设计一下转换的过程,这里以拉取 CSDN 文章列表存入 txt 文本为例:

拉取的接口为 https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950

返回格式如下:

{

"code": 200,

"message": "success",

"traceId": "b1e8ccb0-2e39-4834-bacd-b52a260bb521",

"data": {

"list": [

{

"articleId": 130450076,

"title": "ETL工具 - Kettle 查询、连接、统计、脚本算子介绍",

"description": "连接算子一般将多个数据集通过关键字进行连接,类似 `SQL` 中的连接操作,统计算子可以提供数据的采样和统计功能,脚本算子可以通过程序代码完成一些复杂的操作",

"url": "https://xiaobichao.blog.csdn.net/article/details/130450076",

"type": 1,

"top": false,

"forcePlan": false,

"viewCount": 313,

"commentCount": 0,

"editUrl": "https://editor.csdn.net/md?articleId=130450076",

"postTime": "2023-04-30 23:12:13",

"diggCount": 1,

"formatTime": "前天 23:12",

"picList": [

"https://img-blog.csdnimg.cn/2e817e14046f4cba9663c89978198f12.png"

]

}

],

"total": 287

}

}

1.1 转换设计过程

这里 url 通过变量的形式传递进来,整体的转换设计如下:

获取变量:

REST client:

JSON input:

字段选择:

文本文件输出:

设计完后,保存 ktr 脚本:

1.2 Java 调用转换脚本

新建一个 Mavne 项目,在 pom 中引入下面依赖:

pentaho-kettle

kettle-core

9.6.0.0-SNAPSHOT

pentaho-kettle

kettle-engine

9.6.0.0-SNAPSHOT

org.pentaho.di.plugins

pdi-core-plugins-impl

9.6.0.0-SNAPSHOT

pentaho

pentaho-capability-manager

9.6.0.0-SNAPSHOT

compile

commons-cli

commons-cli

1.3.1

com.sun.jersey.contribs

jersey-apache-client4

1.9.1

com.sun.jersey

jersey-core

1.19.1

com.sun.jersey

jersey-client

1.19.1

com.sun.jersey

jersey-bundle

1.19.1

pentaho-public

Pentaho Public

https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/

true

daily

true

interval:15

在 resources 下新建 kettle-password-encoder-plugins.xml 文件,内容如下:

Kettle Password Encoder

org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder

Java 调用逻辑:

public class RunTrans {

public static void main(String[] args) {

try {

// 指定插件位置,注意改为你的安装目录

StepPluginType.getInstance().getPluginFolders().

add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));

// 初始化 kettle 环境

KettleEnvironment.init();

} catch (KettleException e) {

e.printStackTrace();

}

String ktrPath = "D:/data/job/trans.ktr";

String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";

// 添加变量

Map variableMap = new HashMap<>();

variableMap.put("url", url);

Boolean res = runTrans(ktrPath, variableMap, null);

System.out.println("转换执行结果:" + res);

}

private static Boolean runTrans(String ktrPath, Map variableMap, Map parameterMap) {

try {

// 加载 ktr 文件

TransMeta transMeta = new TransMeta(ktrPath, (Repository) null);

Trans trans = new Trans(transMeta);

trans.setLogLevel(LogLevel.MINIMAL);

// 变量

if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {

variableMap.forEach(trans::setVariable);

}

// 参数

if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {

parameterMap.forEach((k, v) -> {

try {

trans.setParameterValue(k, v);

} catch (UnknownParamException e) {

e.printStackTrace();

}

});

}

// 监听执行日志

KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {

@Override

public void eventAdded(KettleLoggingEvent logs) {

System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());

}

});

// 执行转换

trans.execute(new String[0]);

// 等待执行完成

trans.waitUntilFinished();

// 是否执行成功

return trans.getErrors() == 0;

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

}

下面到输出目录查看结果:

二、JAVA 调用 Kettle 任务

任务就调用上面的转换,进行测试:

保存 kjb 文件:

Java 调用逻辑:

public class RunJob {

public static void main(String[] args) {

try {

// 指定插件位置

StepPluginType.getInstance().getPluginFolders().

add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));

// 初始化 kettle 环境

KettleEnvironment.init();

} catch (KettleException e) {

e.printStackTrace();

}

String kjbPath = "D:/data/job/job.kjb";

String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=2&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";

// 添加变量

Map variableMap = new HashMap<>();

variableMap.put("url", url);

Boolean res = runJob(kjbPath, variableMap, null);

System.out.println("转换执行结果:" + res);

}

private static Boolean runJob(String kjbPath, Map variableMap, Map parameterMap) {

try {

JobMeta jobMeta = new JobMeta(kjbPath, null);

Job job = new Job(null, jobMeta);

job.setLogLevel(LogLevel.MINIMAL);

// 变量

if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {

variableMap.forEach(job::setVariable);

}

// 参数

if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {

parameterMap.forEach((k, v) -> {

try {

job.setParameterValue(k, v);

} catch (UnknownParamException e) {

e.printStackTrace();

}

});

}

// 监听执行日志

KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {

@Override

public void eventAdded(KettleLoggingEvent logs) {

System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());

}

});

// 执行作业

job.start();

// 等待执行完成

job.waitUntilFinished();

// 是否执行成功

return job.getErrors() == 0;

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

}

下面到输出目录查看结果:

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: