引入flink依赖

//stream api和table api

org.apache.flink

flink-table-api-java-bridge_2.11

1.14.2

provided

org.apache.flink

flink-clients_2.11

1.14.2

provided

编写入口

目录结构

com.example.demo

auto

ChildApplication task

TaskAbstractTaskTaskManager time

TimeSourceTimeTask Demo2Application

子容器初始化类

@EnableAutoConfiguration

public class ChildApplication {

}

任务接口

public interface Task {

void run(String... args) throws Exception;

}

抽象任务类

@Slf4j

public abstract class AbstractTask implements Task {

@Override

public void run(String... args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//解析spring参数

DefaultApplicationArguments arguments = new DefaultApplicationArguments(args);

//解析flink参数

ParameterTool parameterTool = ParameterTool.fromArgs(args);

//合并两种参数

Configuration configuration = new Configuration();

Map map = parameterTool.toMap();

for (Map.Entry entry : map.entrySet()) {

if (Objects.equals(entry.getValue(), "__NO_VALUE_KEY")) {

continue;

}

configuration.setString(entry.getKey(), entry.getValue());

}

Set optionNames = arguments.getOptionNames();

for (String optionName : optionNames) {

List optionValues = arguments.getOptionValues(optionName);

if (CollectionUtils.isEmpty(optionValues)) {

continue;

}

configuration.setString(optionName, String.join(",", optionValues));

}

//设置全局参数

env.getConfig().setGlobalJobParameters(configuration);

//配置任务

configTask(env, parameterTool);

//提交任务

JobClient jobClient = env.executeAsync(getClass().getName());

if (jobClient instanceof WebSubmissionJobClient) {

return;

}

jobClient.getJobExecutionResult()

.whenComplete(new BiConsumer() {

@Override

public void accept(JobExecutionResult jobExecutionResult, Throwable throwable) {

log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS));

}

});

}

public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool);

}

任务管理器

@Slf4j

@Service

public class TaskManager implements CommandLineRunner {

@Resource

List taskList;

@Override

public void run(String... args) throws Exception {

ParameterTool parameterTool = ParameterTool.fromArgs(args);

log.info("程序参数 {}", parameterTool);

String runTaskName = parameterTool.get("task");

if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) {

return;

}

for (Task task : taskList) {

if (Objects.equals(runTaskName, task.getClass().getName())) {

task.run(args);

}

}

}

}

一个计时任务数据源

@Slf4j

@Service

public class TimeSource extends RichSourceFunction {

volatile boolean running = true;

private JdbcTemplate jdbcTemplate;

@Override

public void open(Configuration parameters) throws Exception {

//创建一个容器,并拿到需要的bean

List args = new LinkedList<>();

args.add(String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", cls.getName() + UUID.randomUUID()));

args.add(String.format("--spring.jmx.default-domain=%s", cls.getName() + UUID.randomUUID()));

Configuration globalJobParameters = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();

String activeKey = "spring.profiles.active";

String active = globalJobParameters.getString(ConfigOptions.key(activeKey).stringType().noDefaultValue());

if (StringUtils.isNotEmpty(active)) {

args.add(String.format("--%s=%s", activeKey, active));

}

ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args.toArray(new String[0]));

jdbcTemplate = applicationContext.getBean(JdbcTemplate.class);

}

@Override

public void run(SourceContext ctx) throws Exception {

while (running) {

Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class));

ctx.collect(date);

TimeUnit.SECONDS.sleep(1);

}

}

@Override

public void cancel() {

running = false;

}

}

写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:

xxx is not serializable:flink的算子可能会在不同的机器上运行,所以类信息会序列化之后传输。所以算子不能有任何不能序列化的字段(字段为null除外)有些需要的字段没有实现Serializable,但是又确实要用到,比如JdbcTemplate,如果是mybatis的话,就是各种mapper;像这些字段,只能在open方法里面初始化。有两种方法做这个初始化:一是,通过全局参数把一些连接信息传到算子,然后在open方法中初始化JdbcTemplate;二是,在open方法中重新创建一个容器,然后从容器中拿到JdbcTemplate。第一种方法,比较容易实现,但是要手动装配JdbcTemplate;第二种方法,要重新创建一个容器,装配的任务全都交给容器;想法是很nice,但在一个容器中创建另一个容器,比想象中的要复杂一些。在一个容器中初始化另一个容器:

需要一个容器初始化类:因为毕竟不需要注入所有对象,所以不能用主程序启动类Demo2Application;但是又要autoconfigure里面的很多对象,所以考虑加@EnableAutoConfiguration注解,同时放入单独的auto包,避免扫到不需要的bean定义;如果需要mybatis的mapper,考虑加@MapperScan注解定义好容器初始化类之后,启动报错:Error creating bean with name ‘springApplicationAdminRegistrar’ defined in class path resource [org/springframework/boot/autoconfigure/admin/SpringApplicationAdminJmxAutoConfiguration.class]: Invocation of init method failed; nested exception is javax.management.InstanceAlreadyExistsException: org.springframework.boot:type=Admin,name=SpringApplication。看错误信息是实例重复了,这个有两种解决办法:

容器初始化类直接排除掉SpringApplicationAdminJmxAutoConfiguration.class:@EnableAutoConfiguration(exclude = {SpringApplicationAdminJmxAutoConfiguration.class})子容器启动时修改spring.application.admin.jmx-name:–spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s 再启动,还是报错:Unable to register MBean [HikariDataSource (HikariPool-2)] with key ‘dataSource’; nested exception is javax.management.InstanceAlreadyExistsException: com.zaxxer.hikari:name=dataSource,type=HikariDataSource。又是个实例重复的问题,这个问题百度了下,需要给–spring.jmx.default-domain配置个新的值:–spring.jmx.default-domain=%s再启动,子容器正常创建,程序运行发现ok打包上传flink web,提交运行,正常!

一个计时任务

@Slf4j

@Service

public class TimeTask extends AbstractTask {

@Resource

private TimeSource timeSource;

@Override

public void configTask(StreamExecutionEnvironment env, ParameterTool tool) {

env.getConfig().setAutoWatermarkInterval(0);

env.addSource(timeSource)

.setParallelism(1)

.print()

.setParallelism(1);

}

}

主程序启动类

@SpringBootApplication

public class Demo2Application {

public static void main(String[] args) {

SpringApplication.run(Demo2Application.class, args);

}

}

打包程序

设置parent

org.springframework.boot

spring-boot-starter-parent

2.7.5

直接使用spring-boot-maven-plugin?

org.springframework.boot

spring-boot-maven-plugin

因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。

考虑使用maven-shade-plugin

参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:

org.apache.maven.plugins

maven-shade-plugin

3.3.0

package

shade

false

com.google.code.findbugs:jsr305

org.slf4j:*

log4j:*

*:*

module-info.class

META-INF/*.SF

META-INF/*.DSA

META-INF/*.RSA

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

META-INF/spring.handlers

reference.conf

implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">

META-INF/spring.factories

implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">

META-INF/spring.schemas

implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

${start-class}

结果报错:

Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer

纠结了半天,也没找到原因

再试试maven-assembly-plugin

org.apache.maven.plugins

maven-assembly-plugin

3.3.0

${start-class}

jar-with-dependencies

make-assembly

package

single

可以正常打包,本地也能运行,但是上传到flink web报错

LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)

很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。

重回maven-shade-plugin

找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。

那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer的问题呢?

恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parent的pluginManagement,发现里面定义很多插件,其中就包括maven-shade-plugin。

按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:

org.apache.maven.plugins

maven-shade-plugin

mvn clean package

打包成功了!

仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。

后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:

org.apache.maven.plugins

maven-shade-plugin

com.google.code.findbugs:jsr305

ch.qos.logback:*

参考阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: