知其然知其所以然

转载注明出处,且必须看到最后,留言证明

引发问题

spark任务状态获取不准确,任务是失败的,但结果返回成功,在注册的Listener中也可以看到状态先是FINISHED,过一会才会变成FAILED,因为FINISHED是isFinal(),导致任务退出,漏掉了后返回的失败状态

如果采用jobState.isFinal()判断任务结束,则会触发这个问题。

// spark程序退出的错误判断条件

if (jobState.isFinal()) {

countDownLatch.countDown();

}

原理解析

想要从根本上解决问题,还是需要知道为什么会先出现FINISHED后又变成FAILED,或者是怎么变成FAILED。 Spark 任务使用SparkLauncher.startApplication方式提交,核心代码如下,并通过代码逻辑总结为下图关系。

核心代码:

@Override

public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {

// 初始化LauncherServer

LauncherServer server = LauncherServer.getOrCreateServer();

// 初始化AppHandle

ChildProcAppHandle handle = new ChildProcAppHandle(server);

// 注册listener

for (SparkAppHandle.Listener l : listeners) {

handle.addListener(l);

}

// 注册handle

String secret = server.registerHandle(handle);

// 初始化进程

ProcessBuilder pb = createBuilder();

try {

// 启动进程

Process child = pb.start();

// 注册进程

handle.setChildProc(child, loggerName, logStream);

} catch (IOException ioe) {

handle.kill();

throw ioe;

}

return handle;

}

首先看LauncherServer这个类,进入到类里,查看注释,大致逻辑为当User App(客户端)提交任务到Spark App(服务端),L. Backend会向 L. Server(LauncherServer)响应,之后L. Backend都是与App Handle交互,那么我们就需要重点研究App Handle。

=> LauncherServer类注释

=> App Handle类分析 从上图总结的关系看出,AppHandle里一共有两个角色,listener和process。listener是我们自定义的,实现stateChanged方法,主要是监听状态变化,那么process是干什么的呢。通过handle.setChildProc()进入到具体实现,通过继续追源码,最终都开启了monitorChild线程

void setChildProc(Process childProc, String loggerName, InputStream logStream) {

this.childProc = childProc;

if (logStream != null) {

this.redirector = new OutputRedirector(logStream, loggerName,

SparkLauncher.REDIRECTOR_FACTORY, this);

} else {

// If there is no log redirection, spawn a thread that will wait for the child process

// to finish.

SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();

}

}

进入到monitorChild方法,其实就是做一件事,监控SparkLauncher.startApplication里启动的进程process,并且根据进程值做相应状态处理。其中有一句注释

Override state with failure if the current state is not final, or is success

翻译过来就是:

当进程结束时,如果状态是 未完成 或 成功 ,则覆盖(更新)为失败。

与问题现象一致,到这,问题根因已经找到,顺便就把问题彻底解决。 从整个逻辑上看,当进程不存在时(proc.isAlive()),并且处理完状态后,最终调用dispose()方法。更新disposed属性为true,在父类AbstractAppHandle中有个isDisposed()方法可以获取该值。

/**

* Wait for the child process to exit and update the handle's state if necessary, according to

* the exit code.

*/

void monitorChild() {

Process proc = childProc;

// 进程活着则一直等待,进程结束则走下面的处理

while (proc.isAlive()) {

proc.waitFor();

}

// 获取最终进程值

int ec;

try {

ec = proc.exitValue();

} catch (Exception e) {

ec = 1;

}

// 根据进程处理状态

if (ec != 0) {

State currState = getState();

// Override state with failure if the current state is not final, or is success.

if (!currState.isFinal() || currState == State.FINISHED) {

setState(State.FAILED, true);

}

}

// 释放

dispose();

}

}

AbstractAppHandle.isDisposed()方法

boolean isDisposed() {

return disposed;

}

至此,spark程序是否真正结束应该通过AbstractAppHandle.isDisposed()方法判断 q: 是否可以使用proc.isAlive()方法? a: 不可以,proc.isAlive()判断后,才会更新状态,也就是到最终状态的变更还是会有一点时间差

正确的代码编写

同步提交

思路:采用CountDownLatch,在spark程序未完成之前一直wait

public void syncSubmit(String mainClass, String args, Map confMap) throws IOException {

// 用于阻塞主线程,等待任务结束

CountDownLatch countDownLatch = new CountDownLatch(1);

// 自定义SparkLauncher, 继承SparkLauncher

sparkLauncher = new CustomSparkSubmitLauncher();

// todo 吧啦吧啦 自己设置参数

sparkLauncher

.setJavaHome(xxx)

.setSparkHome(xxx)

.setMaster(xxx)

.setDeployMode(xxx)

.setAppName(xxx)

.setVerbose(true);

// 重点来了,启动spark程序,注册Listener,用于获取程序状态

sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() {

@Override

public void stateChanged(SparkAppHandle sparkAppHandle) {

jobState = sparkAppHandle.getState();

// print log when state change

if (sparkAppHandle.getAppId() != null) {

logger.info("{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());

} else {

logger.info("stateChanged: {}", jobState.toString());

}

// spark程序退出的错误判断条件

// if (jobState.isFinal()) {

// countDownLatch.countDown();

// }

// spark程序退出的正确判断条件

if (sparkLauncher.isDisposed()){

countDownLatch.countDown();

}

}

@Override

public void infoChanged(SparkAppHandle handle) {

}

});

// 阻塞主线程

countDownLatch.await();

}

异步提交

思路:提交后使用sparkLauncher.isDisposed()轮询监控spark程序是否退出

public void asyncSubmit(String mainClass, String args, Map confMap) throws IOException {

// 自定义SparkLauncher, 继承SparkLauncher

sparkLauncher = new CustomSparkSubmitLauncher();

// todo 吧啦吧啦 自己设置参数

sparkLauncher

.setJavaHome(xxx)

.setSparkHome(xxx)

.setMaster(xxx)

.setDeployMode(xxx)

.setAppName(xxx)

.setVerbose(true);

// 重点来了,启动spark程序,注册Listener,用于获取程序状态

sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() {

@Override

public void stateChanged(SparkAppHandle sparkAppHandle) {

jobState = sparkAppHandle.getState();

// print log when state change

if (sparkAppHandle.getAppId() != null) {

logger.info("{} stateChanged: {}", sparkAppHandle.getAppId(), jobState.toString());

} else {

logger.info("stateChanged: {}", jobState.toString());

}

}

@Override

public void infoChanged(SparkAppHandle handle) {

}

});

}

public void main() {

asyncSubmit(参数)

while (!sparkLauncher.isDisposed()) {

Thread.sleep(5000)

logger.info("job state: " + jobState)

}

logger.info("job final")

// todo something

}

CustomSparkSubmitLauncher

注意:类名随意该类必须在org.apache.spark.launcher包下,所以在项目里新建一个相同路径即可 原因:AbstractAppHandle 不是public的,所以不能被其他包的类访问,为什么要用这个类,因为要用isDisposed()方法,那为什么要用这个方法呢,那就看下面的详细解释吧。

package org.apache.spark.launcher; // 包路径非常重要,必须是该路径

import java.io.IOException;

public class CustomSparkSubmitLauncher extends SparkLauncher {

private SparkAppHandle sparkAppHandle;

@Override

public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {

this.sparkAppHandle = super.startApplication(listeners);

return sparkAppHandle;

}

public boolean isDisposed() {

return ((AbstractAppHandle) sparkAppHandle).isDisposed();

}

}

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: