目录

前言定位问题解决方法方法1:调高广播的超时时间方法2:禁用或者调低自动广播的阈值

总结

前言

最近真是和 Spark 任务杠上了,业务团队说是线上有个Spark调度任务出现了广播超时问题,根据经验来看应该比较好解决。

定位问题

接着就是定位问题了,先给大家看下抛出异常的任务日志信息:

ERROR exchange.BroadcastExchangeExec: Could not execute broadcast in 600 secs.

java.util.concurrent.TimeoutException: Futures timed out after [600 seconds]

at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:223)

at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)

at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:220)

at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:146)

at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:388)

at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:154)

at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:150)

at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:162)

at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:150)

at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:117)

at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:259)

at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)

at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190)

at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:38)

at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:71)

at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:190)

at org.apache.spark.sql.execution.FileSourceScanExec.consume(DataSourceScanExec.scala:160)

at org.apache.spark.sql.execution.ColumnarBatchScan$class.produceBatches(ColumnarBatchScan.scala:144)

at org.apache.spark.sql.execution.ColumnarBatchScan$class.doProduce(ColumnarBatchScan.scala:83)

at org.apache.spark.sql.execution.FileSourceScanExec.doProduce(DataSourceScanExec.scala:160)

at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:91)

at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:86)

at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:165)

根据之前的经验,时间超时一般有这几种情况先排查一下:

网络传输广播超时时间阈值太小广播变量的数据量是否太大

通过询问集群运维人员,第一个可以排除了。 第二个从日志中可以看到,广播超时时间阈值设置的是600(10分钟) 第三个,从上面的两个图中我们看到,系统设置的 autoBroadcastJoinThreshold 大小为30M,如果小表的大小小于该值,则会将小表广播到所有executor中,需要注意的是ORC格式的表会对数据进行压缩,通常压缩比为2到3左右,但有些表的压缩比就会很高,有时可以达到10。那么设置过大的话,就会导致广播的时间变长,超过广播超时时间阈值;另外还会导致executor内存压力过大,容易出现OOM。

Broadcast Join 当大表 JOIN 小表时,如果小表足够小,可以将大表分片,分别用小表和每个大表的分片进行 JOIN,最后汇总,能够大大提升作业性能。

解决方法

从定位问题中可以得知,我们可以调整相关的参数来解决这个问题!

方法1:调高广播的超时时间

设置 spark.sql.broadcastTimeout ,单位是秒,假如设置是600,那么就是10分钟。

假如我们要调高广播的超时时间为15分钟,可以进行如下设置:

set spark.sql.broadcastTimeout = 900;

方法2:禁用或者调低自动广播的阈值

# 禁止使用自动广播

set spark.sql.autoBroadcastJoinThreshold=-1;

# 调低自动广播的阈值,官方默认值10M,平台默认值31457280(30M)

set spark.sql.autoBroadcastJoinThreshold=10485760;

总结

在进行Spark 任务开发中需要合理配置 spark.sql.broadcastTimeout 和 spark.sql.autoBroadcastJoinThreshold 参数,并配合 spark.executor.memory,使作业能够顺利执行。

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: