对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流)。

  Explode(炸裂函数)

参数必须是array或者map格式(通常跟split函数使用);主要是将数组中每个元素单独取出来,可以单独使用。注意:单独使用时,仅选择目标字段展示,不可与主表中其他字段一起展示。

  Lateral view(侧输出流)

Lateral view(侧输出流)通常与UDTF函数一起使用。列转行的时候,通常用lateral view explode(字段)语法上在表后,where之前;理解上可以认为lateral view(虚表)与主表是inner join的逻辑(注意:理解上可以这么认为,但是底层逻辑并不是这样下面会做说明)Lateral view explode(字段) 虚拟表 as 列可以与主表的多个字段一起展示

Lateral view 与 主表逻辑上为什么可以认为是inner join?

 首先看一段HSQL:

select t3.value as categoryId

from ( select features,split(t2.categoryIdStr,',') as values

from AA

lateral view json_tuple(features, 'categoryId') t1 AS categoryIdArray

lateral view explode(split(regexp_replace(regexp_extract(t1.categoryIdArray,'^\\[(.+)\\]$',1),'\\}\\,\\{', '\\}\\|\\|\\{'),'\\|\\|')) t2 AS categoryIdStr

where features like '%categoryId%'

) tmp

lateral view explode(values) t3 AS value

当t1表中数据为null的时候,查询没有数据;所以理解上可以认为tmp表与t3表进行了inner join,导致最终这个查询没有数据。

但是实际上并不是inner join逻辑的原因可以分为两个

第一:t1虚拟表可以放在 AA表下面;放入tmp内,即使tmp内没有t1表的字段,在tmp外面仍然可以访问t1的数据。这说明AA表和t1表并不是inner join.

第二:通过Hive执行流程判断

显然在这个执行树中可以看出,整个执行过程没有reduce的出现,即整个执行过程没有shuffle的出现。我们知道,Hive中每次join都会产生一个mr将数据落地。所以,整个执行过程中没有reduce的出现,则说明lateral view(侧输出流)没有进行join操作。

HIVE中执行流程:

STAGE PLANS:

Stage: Stage-1

Map Reduce

Map Operator Tree:

TableScan

alias: aa

Filter Operator

predicate: (features like '%categoryId%') (type: boolean)

Lateral View Forward

Select Operator

Lateral View Join Operator

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

expressions: split(_col5, ',') (type: array)

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: _col1 (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

outputColumnNames: _col2

Select Operator

File Output Operator

Select Operator

expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

Select Operator

expressions: split(_col5, ',') (type: array)

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: _col1 (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: features (type: string), 'categoryId' (type: string), _col1

UDTF Operator

function name: json_tuple

Lateral View Join Operator

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

expressions: split(_col5, ',') (type: array)

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: _col1 (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

Select Operator

expressions: split(_col5, ',') (type: array)

Lateral View Forward

Select Operator

Lateral View Join Operator

Select Operator

File Output Operator

Select Operator

expressions: _col1 (type: array)

UDTF Operator

function name: explode

Lateral View Join Operator

Select Operator

File Output Operator

即:

整个执行流程大致是:上图的过程。整个过程都在map端完成,没有reduce的参与。(吐槽:Hive的Operator Tree看着很恶心)

源码解读:

对于lateral view的应用规则熟悉以后,都知道lateral view 与UDTF()函数连用。对应的Hive执行计划中UDTF Operator操作过程。

在自定义UDTF函数时,通常需要继承GenericUDTF函数。Hive源码中提供了以下六种UDTF函数继承GenericUDTF类和八个文件(其中需要关注UDTFCollector类),其中GenericUDTFExplode即hive中explode(炸裂函数)。

 

GenericUDTF源码中主要有一个方法:

// 该方法主要是将结果放入收集器中

protected final void forward(Object o) throws HiveException {

collector.collect(o);

}

通过该方法查找上查Collector接口,其中实现这个接口的类仅有

 

UDTFCollector类中主要有有一个方法:

// 该方法主要将GenericUDTF收集的数据传递给UDTFOperator

public void collect(Object input) throws HiveException {

op.forwardUDTFOutput(input);

counter++;

}

UDTFOperator类中主要有三个方法

// UDTF函数操作的初始化

protected void initializeOp(Configuration hconf) throws HiveException {

genericUDTF = conf.getGenericUDTF();

collector = new UDTFCollector(this);

genericUDTF.setCollector(collector);

udtfInputOI = (StructObjectInspector) inputObjInspectors[0];

objToSendToUDTF = new Object[udtfInputOI.getAllStructFieldRefs().size()];

MapredContext context = MapredContext.get();

if (context != null) {

context.setup(genericUDTF);

}

StructObjectInspector udtfOutputOI = genericUDTF.initialize(udtfInputOI);

if (conf.isOuterLV()) {

outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);

}

if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) {

autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,

Utilities.getDefaultNotificationInterval(hconf),

HiveConf.getTimeVar(

hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));

autoProgressor.go();

}

super.initializeOp(hconf);

}

// UDTF函数操作的(操作)过程

public void processOp(Object row, int tag) throws HiveException {

// The UDTF expects arguments in an object[]

StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];

List fields = soi.getAllStructFieldRefs();

for (int i = 0; i < fields.size(); i++) {

objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));

}

genericUDTF.process(objToSendToUDTF);

if (conf.isOuterLV() && collector.getCounter() == 0) {

collector.collect(outerObj);

}

collector.reset();

}

// 输出

public void forwardUDTFOutput(Object o) throws HiveException {

// Since the output of the UDTF is a struct, we can just forward that

forward(o, outputObjInspector);

}

其中核心关注

initializeOp方法:

if (conf.isOuterLV()) {

outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);

}

该方法是指:UDTFOperator时,如果isOuterLV()为true,则获取到的Array数据大小一样的数组outerObj

processOp方法:

genericUDTF.process(objToSendToUDTF);

if (conf.isOuterLV() && collector.getCounter() == 0) {

collector.collect(outerObj);

}

该方法是指UDTFOperator时,如果isOuterLV()为true时,将outerObj结果放入UDTFOperator的收集器中

这两个方法中都有一个isOuterLV()方法,查看该方法的源码:

@Explain(displayName = "UDTF Operator")

public class UDTFDesc extends AbstractOperatorDesc {

private static final long serialVersionUID = 1L;

private GenericUDTF genericUDTF;

private boolean outerLV;

public UDTFDesc() {

}

public UDTFDesc(final GenericUDTF genericUDTF, boolean outerLV) {

this.genericUDTF = genericUDTF;

this.outerLV = outerLV;

}

public GenericUDTF getGenericUDTF() {

return genericUDTF;

}

public void setGenericUDTF(final GenericUDTF genericUDTF) {

this.genericUDTF = genericUDTF;

}

@Explain(displayName = "function name")

public String getUDTFName() {

return genericUDTF.toString();

}

public boolean isOuterLV() {

return outerLV;

}

public void setOuterLV(boolean outerLV) {

this.outerLV = outerLV;

}

@Explain(displayName = "outer lateral view")

public String isOuterLateralView() {

return outerLV ? "true" : null;

}

}

其中isOuterLateralView()这个方法说明HIVE中存在outer lateral view语法。当Hql语法中有这个写法即为true;

源码分析到这个地方,整体回溯上去就能发现当时用lateral view outer语法时,表明输入的结果集与输出的结果集个数一样。则使用lateral view outer UDTF()函数时,理解上可以认为是主表与虚拟表之间进行left join。注意:其本质上本没有进行join,只是就结果而言可以这么理解。

GenericUDTFExplode源码中主要有二个方法:

//初始化

public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {

if (args.length != 1) {

throw new UDFArgumentException("explode() takes only one argument");

}

ArrayList fieldNames = new ArrayList();

ArrayList fieldOIs = new ArrayList();

switch (args[0].getCategory()) {

case LIST:

inputOI = args[0];

fieldNames.add("col");

fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());

break;

case MAP:

inputOI = args[0];

fieldNames.add("key");

fieldNames.add("value");

fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());

fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());

break;

default:

throw new UDFArgumentException("explode() takes an array or a map as a parameter");

}

return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,

fieldOIs);

}

// 执行过程

public void process(Object[] o) throws HiveException {

switch (inputOI.getCategory()) {

case LIST:

ListObjectInspector listOI = (ListObjectInspector)inputOI;

List list = listOI.getList(o[0]);

if (list == null) {

return;

}

for (Object r : list) {

forwardListObj[0] = r;

forward(forwardListObj);

}

break;

case MAP:

MapObjectInspector mapOI = (MapObjectInspector)inputOI;

Map map = mapOI.getMap(o[0]);

if (map == null) {

return;

}

for (Entry r : map.entrySet()) {

forwardMapObj[0] = r.getKey();

forwardMapObj[1] = r.getValue();

forward(forwardMapObj);

}

break;

default:

throw new TaskExecutionException("explode() can only operate on an array or a map");

}

}

其中if (map == null) {

        return;

      }

则返回null;

表明mr过程中,在map端没有匹配值的时候,返回null;方便理解的话便是虚表与主表进行inner join的结果。

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: