文章目录

前言一、Springboot 整合:1.1 引入jar:1.2 配置zookeeper 注册中心:1.3 定义job 业务类:1.4 job 注册到zookeeper:1.5 项目启动:1.5.1 zookeeper 注册中心实例:1.5.2 任务执行日志输出:

二、扩展:2.1 任务监听器:2. 2 DataflowJob 流工作:2.2.1 新建 DataflowJob:2.2.2 streaming.process 属性配置:2.2.3 执行效果:

总结

前言

本文对springBoot 整合 elasticjob 进行介绍。 本文环境 jdk:1.8 ; zookeeper : 3.7

一、Springboot 整合:

1.1 引入jar:

org.apache.shardingsphere.elasticjob

elasticjob-lite-core

3.0.1

org.yaml

snakeyaml

1.27

ch.qos.logback

logback-classic

1.2.12

1.2 配置zookeeper 注册中心:

ElasticJobZookeeper:

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;

import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ElasticJobZookeeper {

@Bean(initMethod = "init")

public CoordinatorRegistryCenter createRegistryCenter() {

ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job");

zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000);

zookeeperConfiguration.setSessionTimeoutMilliseconds(10000);

zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000);

CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);

return regCenter;

}

}

1.3 定义job 业务类:

MyJob:

import lombok.extern.slf4j.Slf4j;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import org.springframework.stereotype.Component;

@Slf4j

@Component

public class MyJob implements SimpleJob {

@Override

public void execute(ShardingContext shardingContext) {

// 分片参数 0=text,1=image,2=radio,3=vedio

String shardingParameter= shardingContext.getShardingParameter();

String jobParameter= shardingContext.getJobParameter();

log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(),

shardingContext.getShardingItem(), shardingParameter,jobParameter);

if ("text".equals(jobParameter)) {

// do something by sharding

}

switch (shardingContext.getShardingItem()) {

case 0:

// do something by sharding item 0

break;

case 1:

// do something by sharding item 1

break;

case 2:

// do something by sharding item 2

break;

// case n: ...

}

}

}

MyJob1:

import lombok.extern.slf4j.Slf4j;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import org.springframework.stereotype.Component;

@Slf4j

@Component

public class MyJob1 implements SimpleJob {

@Override

public void execute(ShardingContext shardingContext) {

log.debug("job 执行 error,job名称:{},分片数量:{}",shardingContext.getJobName(),shardingContext.getShardingTotalCount());

switch (shardingContext.getShardingItem()) {

case 0:

// do something by sharding item 0

break;

case 1:

// do something by sharding item 1

break;

case 2:

// do something by sharding item 2

break;

// case n: ...

}

}

}

1.4 job 注册到zookeeper:

ElasticJobConfigure

import com.example.springelasticjob.quickstart.MyJob;

import com.example.springelasticjob.quickstart.MyJob1;

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;

import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ElasticJobConfigure implements InitializingBean {

@Autowired

private MyJob myJob;

@Autowired

private MyJob1 myJob1;

@Autowired

private CoordinatorRegistryCenter coordinatorRegistryCenter;

public JobConfiguration createJobConfiguration(Class JobClass, int shardingTotalCount, String cron, String shardingItemParameters) {

// 创建作业配置

JobConfiguration jobConfiguration = JobConfiguration.newBuilder(JobClass.getName(), shardingTotalCount).cron(cron).overwrite(true)

.shardingItemParameters(shardingItemParameters).jobListenerTypes().build();

return jobConfiguration;

}

@Override

public void afterPropertiesSet() throws Exception {

JobConfiguration jobConfiguration = createJobConfiguration(myJob.getClass(), 1, "0/10 * * * * ?", null);

new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob, jobConfiguration).schedule();

JobConfiguration jobConfiguration1 = createJobConfiguration(myJob1.getClass(), 1, "0/1 * * * * ?", null);

new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob1, jobConfiguration1).schedule();

}

}

1.5 项目启动:

1.5.1 zookeeper 注册中心实例:

1.5.2 任务执行日志输出:

二、扩展:

2.1 任务监听器:

定义监听器:

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.time.DateFormatUtils;

import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;

import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;

import java.util.Date;

@Slf4j

public class MyElasticJobListener implements ElasticJobListener {

private long beginTime = 0;

@Override

public void beforeJobExecuted(ShardingContexts shardingContexts) {

beginTime = System.currentTimeMillis();

log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));

}

@Override

public void afterJobExecuted(ShardingContexts shardingContexts) {

long endTime = System.currentTimeMillis();

log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime);

}

@Override

public String getType() {

return "myElasticJobListener";

}

}

2) 在项目resources 新建文件夹: META-INF\services 3)新建文件,名称为:org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener 文集内容:

# 监听器实现类的 类全路径

com.example.springelasticjob.config.MyElasticJobListener

4)job 配置增加监听器:

// 创建作业配置

JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")

.overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")

.jobListenerTypes("myElasticJobListener")

.build();

jobListenerTypes(“myElasticJobListener”) 中 “myElasticJobListener” 要和 MyElasticJobListener getType() 返回的保持一致,否则启动无法找到 监听器:

2. 2 DataflowJob 流工作:

2.2.1 新建 DataflowJob:

import lombok.extern.slf4j.Slf4j;

import org.apache.shardingsphere.elasticjob.api.ShardingContext;

import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;

import java.util.ArrayList;

import java.util.List;

/**

* 流任务

*/

@Slf4j

public class MyDataFlowJob implements DataflowJob {

@Override

public List fetchData(ShardingContext shardingContext) {

// 抓取数据

// 分片参数 0=text,1=image,2=radio,3=vedio

String jobParameter = shardingContext.getJobParameter();

log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), jobParameter);

List list = new ArrayList(1);

list.add("lgx");

return list;

}

@Override

public void processData(ShardingContext shardingContext, List list) {

// 数据处理

System.out.println("list.toString() = " + list.toString());

}

}

2.2.2 streaming.process 属性配置:

private static JobConfiguration createJobConfiguration() {

JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-dataflow-param", 1).cron("0/30 * * * * ?")

.overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")

// streaming.process 流处理设置为true

.setProperty("streaming.process","true")

.build();

return jobConfiguration;

}

2.2.3 执行效果:

虽然任务是每隔30s 执行一次,但是因为 fetchData 可以一直获取到数据,使的 processData 方法可以一直被调用:

总结

本文对 springBoot 整合 elasticjob 整合,监听器的使用,任务的流式处理做介绍。

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: