简介

ETL是英文Extract-Transform-Load的缩写,用来描述将数据从源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程,它能够对各种分布的、异构的源数据(如关系数据)进行抽取,按照预先设计的规则将不完整数据、重复数据以及错误数据等“脏"数据内容进行清洗,得到符合要求的“干净”数据,并加载到数据仓库中进行存储,这些“干净”数据就成为了数据分析、数据挖掘的基石。

kettle是一个开源ETL工具。kettle提供了基于java的图形化界面,使用很方便。kettle提供了基于 JAVA的脚步编写功能,可以灵活地自定义ETL过程,使自行定制、批量处理等成为可能,这才是一个程序员需要做的工作,而不仅是象使用word一样操作 kettle用户界面。

环境集成:

参考:java集成kettle教程(附示例代码)_kettle java_成伟平2022的博客-CSDN博客

代码:

pom.xml添加:

mysql

mysql-connector-java

com.alibaba

druid

1.2.11

pentaho-kettle

kettle-core

8.2.0.7-719

system

${project.basedir}/lib/kettle-core-8.2.0.7-719.jar

pentaho-kettle

kettle-engine

8.2.0.7-719

system

${project.basedir}/lib/kettle-engine-8.2.0.7-719.jar

pentaho-kettle

metastore

8.2.0.7-719

system

${project.basedir}/lib/metastore-8.2.0.7-719.jar

org.apache.commons

commons-vfs2

2.2

com.google.guava

guava

17.0

commons-io

commons-io

2.2

commons-lang

commons-lang

2.6

commons-codec

commons-codec

1.10

com.jcraft

jsch

0.1.54

net.sourceforge.jexcelapi

jxl

2.6.12

@RestController

@RequestMapping("${application.admin-path}/etl-kettl")

//@Api(tags = "ETL-Kettle的demo接口")

public class KettleDemoContrllor {

@Resource

KettleService kettleService;

@GetMapping("/execKtr")

//@ApiOperation("执行ktr文件")

private Object runKtr(String filename) throws Exception {

return R.buildOkData(kettleService.runTaskKtr(filename,null).toString());

}

@GetMapping("/execKjb")

//@ApiOperation("执行kjb文件")

private Object runKjb(String filename) throws Exception {

return R.buildOkData(kettleService.runTaskKjb(filename, null).toString());

}

}

public interface KettleService {

/**

* 开始执行ETL任务(ktr文件)

*

* @param taskFileName 执行的任务文件名(ktr)

* @param params 执行任务输入的参数

* @return 运行结果

* @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出

*/

Object runTaskKtr(String taskFileName, Map params) throws Exception;

/**

* 开始执行ETL任务(kjb文件)

*

* @param taskFileName 执行的任务文件名(kjb)

* @param params 执行任务输入的参数

* @return 运行结果

* @throws Exception 没有找到配置文件,Kettle的运行异常不会抛出

*/

Object runTaskKjb(String taskFileName, Map params) throws Exception;

}

@Service

public class KettleServiceImpl implements KettleService {

@Value("${kettle.script.path}")

private String kettleScriptPath;

private static final Logger logger = LoggerFactory.getLogger("kettle-service-log");

private final List KTR_METAS = new ArrayList<>();

private final List KJB_METAS = new ArrayList<>();

private List getFiles(String path, String subName) {

List files = new ArrayList<>();

File file = new File(path);

File[] tempList = file.listFiles();

if (tempList == null){

return files;

}

for (File value : tempList) {

if (value.isFile()) {

if (Objects.equals(value.toString().substring(value.toString().length() - 3), subName)) {

files.add(value.getName());

}

}

}

return files;

}

//采用单列模式,项目启动时加载环境,加载所有的转换配置、任务配置,后续执行就会快一点

//@PostConstruct

public void init() throws KettleException {

logger.info("----------------------开始初始化ETL配置------------------------");

KettleEnvironment.init();

List ktrFiles = getFiles(kettleScriptPath, "ktr");

List kjbFiles = getFiles(kettleScriptPath, "kjb");

logger.info("需要加载的转换为:" + ktrFiles.toString());

logger.info("需要加载的任务为:" + kjbFiles.toString());

logger.info("----------------------开始加载ETL配置--------------------------");

for (String ktrFile : ktrFiles) {

KtrMeta ktrMeta = new KtrMeta();

ktrMeta.setName(ktrFile);

ktrMeta.setTransMeta(new TransMeta(kettleScriptPath + ktrFile));

KTR_METAS.add(ktrMeta);

logger.info("成功加载转换配置:" + ktrFile);

}

for (String kjbFile : kjbFiles) {

KjbMeta kjbMeta = new KjbMeta();

kjbMeta.setName(kjbFile);

kjbMeta.setJobMeta(new JobMeta(kettleScriptPath + kjbFile, null));

KJB_METAS.add(kjbMeta);

logger.info("成功加载任务配置:" + kjbFile);

}

logger.info("----------------------全部ETL配置加载完毕-----------------------");

}

@Override

public Object runTaskKtr(String ktrFileName, Map params) {

logger.info("开始执行转换:" + ktrFileName);

TransMeta transMeta = null;

for (KtrMeta ktrMeta : KTR_METAS) {

if(Objects.equals(ktrFileName,ktrMeta.getName())){

transMeta = ktrMeta.getTransMeta();

break;

}

}

//如果在缓存的列表里面没找到需要自信的配置,尝试手动加载

try {

if (transMeta == null) {

logger.warn("资源池没有找到配置文件:" + ktrFileName+" 尝试二次加载!");

KettleEnvironment.init();

transMeta = new TransMeta(kettleScriptPath + File.separator + ktrFileName);

if(transMeta==null) throw new RuntimeException("未找到需要执行的转换配置文件:");

}

Trans trans = new Trans(transMeta);

if (params != null) {

for (Map.Entry entry : params.entrySet()) {

trans.setParameterValue(entry.getKey(), entry.getValue());

}

}

//trans.prepareExecution(null);

//trans.startThreads(); //启用新的线程加载

trans.execute(null);

trans.waitUntilFinished();

return trans.getResult();

}catch (Exception e)

{

e.printStackTrace();

return e.getMessage();

}

}

@Override

public Object runTaskKjb(String objFileName, Map params) throws Exception {

logger.info("开始执行任务:" + objFileName);

JobMeta jobMeta = null;

for (KjbMeta kjbMeta : KJB_METAS) {

if(Objects.equals(objFileName,kjbMeta.getName())){

jobMeta = kjbMeta.getJobMeta();

}

}

try {

if (jobMeta == null) {

logger.warn("资源池没有找到配置文件:" + objFileName+" 尝试二次加载!");

KettleEnvironment.init();

jobMeta = new JobMeta(kettleScriptPath + File.separator + objFileName,null);

if(jobMeta==null) throw new RuntimeException("未找到需要执行的任务配置文件:"+objFileName);

}

Job job = new Job(null, jobMeta);

if (params != null) {

for (Map.Entry entry : params.entrySet()) {

job.setParameterValue(entry.getKey(), entry.getValue());

}

}

job.start();

job.waitUntilFinished();

return job.getResult();

}catch (Exception e)

{

e.printStackTrace();

return e.getMessage();

}

}

}

@Data

public class KtrMeta {

private TransMeta transMeta;

private String name;

}

@Data

public class KjbMeta {

private JobMeta jobMeta;

private String name;

}

总结:

集成后感觉没什么必要集成到项目里面去。关键还是需要学会工具的使用,以便进行数据收集与治理。

参考:1_ETL和Kettle概述_哔哩哔哩_bilibili

下载: kettle工具下载

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: