对于从事大数据开发的同学,经常会应用到explode(炸裂函数)和lateral view(侧输出流)。
Explode(炸裂函数)
参数必须是array或者map格式(通常跟split函数使用);主要是将数组中每个元素单独取出来,可以单独使用。注意:单独使用时,仅选择目标字段展示,不可与主表中其他字段一起展示。
Lateral view(侧输出流)
Lateral view(侧输出流)通常与UDTF函数一起使用。列转行的时候,通常用lateral view explode(字段)语法上在表后,where之前;理解上可以认为lateral view(虚表)与主表是inner join的逻辑(注意:理解上可以这么认为,但是底层逻辑并不是这样下面会做说明)Lateral view explode(字段) 虚拟表 as 列可以与主表的多个字段一起展示
Lateral view 与 主表逻辑上为什么可以认为是inner join?
首先看一段HSQL:
select t3.value as categoryId
from ( select features,split(t2.categoryIdStr,',') as values
from AA
lateral view json_tuple(features, 'categoryId') t1 AS categoryIdArray
lateral view explode(split(regexp_replace(regexp_extract(t1.categoryIdArray,'^\\[(.+)\\]$',1),'\\}\\,\\{', '\\}\\|\\|\\{'),'\\|\\|')) t2 AS categoryIdStr
where features like '%categoryId%'
) tmp
lateral view explode(values) t3 AS value
当t1表中数据为null的时候,查询没有数据;所以理解上可以认为tmp表与t3表进行了inner join,导致最终这个查询没有数据。
但是实际上并不是inner join逻辑的原因可以分为两个
第一:t1虚拟表可以放在 AA表下面;放入tmp内,即使tmp内没有t1表的字段,在tmp外面仍然可以访问t1的数据。这说明AA表和t1表并不是inner join.
第二:通过Hive执行流程判断
显然在这个执行树中可以看出,整个执行过程没有reduce的出现,即整个执行过程没有shuffle的出现。我们知道,Hive中每次join都会产生一个mr将数据落地。所以,整个执行过程中没有reduce的出现,则说明lateral view(侧输出流)没有进行join操作。
HIVE中执行流程:
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: aa
Filter Operator
predicate: (features like '%categoryId%') (type: boolean)
Lateral View Forward
Select Operator
Lateral View Join Operator
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
expressions: split(_col5, ',') (type: array
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: _col1 (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
outputColumnNames: _col2
Select Operator
File Output Operator
Select Operator
expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
Select Operator
expressions: split(_col5, ',') (type: array
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: _col1 (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: features (type: string), 'categoryId' (type: string), _col1
UDTF Operator
function name: json_tuple
Lateral View Join Operator
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
expressions: split(_col5, ',') (type: array
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: _col1 (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: split(regexp_replace(regexp_extract(_col4, '^\[(.+)\]$', 1), '\}\,\{', '\}\|\|\{'), '\|\|') (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
Select Operator
expressions: split(_col5, ',') (type: array
Lateral View Forward
Select Operator
Lateral View Join Operator
Select Operator
File Output Operator
Select Operator
expressions: _col1 (type: array
UDTF Operator
function name: explode
Lateral View Join Operator
Select Operator
File Output Operator
即:
整个执行流程大致是:上图的过程。整个过程都在map端完成,没有reduce的参与。(吐槽:Hive的Operator Tree看着很恶心)
源码解读:
对于lateral view的应用规则熟悉以后,都知道lateral view 与UDTF()函数连用。对应的Hive执行计划中UDTF Operator操作过程。
在自定义UDTF函数时,通常需要继承GenericUDTF函数。Hive源码中提供了以下六种UDTF函数继承GenericUDTF类和八个文件(其中需要关注UDTFCollector类),其中GenericUDTFExplode即hive中explode(炸裂函数)。
GenericUDTF源码中主要有一个方法:
// 该方法主要是将结果放入收集器中
protected final void forward(Object o) throws HiveException {
collector.collect(o);
}
通过该方法查找上查Collector接口,其中实现这个接口的类仅有
UDTFCollector类中主要有有一个方法:
// 该方法主要将GenericUDTF收集的数据传递给UDTFOperator
public void collect(Object input) throws HiveException {
op.forwardUDTFOutput(input);
counter++;
}
UDTFOperator类中主要有三个方法
// UDTF函数操作的初始化
protected void initializeOp(Configuration hconf) throws HiveException {
genericUDTF = conf.getGenericUDTF();
collector = new UDTFCollector(this);
genericUDTF.setCollector(collector);
udtfInputOI = (StructObjectInspector) inputObjInspectors[0];
objToSendToUDTF = new Object[udtfInputOI.getAllStructFieldRefs().size()];
MapredContext context = MapredContext.get();
if (context != null) {
context.setup(genericUDTF);
}
StructObjectInspector udtfOutputOI = genericUDTF.initialize(udtfInputOI);
if (conf.isOuterLV()) {
outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
}
if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVEUDTFAUTOPROGRESS)) {
autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
Utilities.getDefaultNotificationInterval(hconf),
HiveConf.getTimeVar(
hconf, HiveConf.ConfVars.HIVES_AUTO_PROGRESS_TIMEOUT, TimeUnit.MILLISECONDS));
autoProgressor.go();
}
super.initializeOp(hconf);
}
// UDTF函数操作的(操作)过程
public void processOp(Object row, int tag) throws HiveException {
// The UDTF expects arguments in an object[]
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
List extends StructField> fields = soi.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));
}
genericUDTF.process(objToSendToUDTF);
if (conf.isOuterLV() && collector.getCounter() == 0) {
collector.collect(outerObj);
}
collector.reset();
}
// 输出
public void forwardUDTFOutput(Object o) throws HiveException {
// Since the output of the UDTF is a struct, we can just forward that
forward(o, outputObjInspector);
}
其中核心关注
initializeOp方法:
if (conf.isOuterLV()) {
outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
}
该方法是指:UDTFOperator时,如果isOuterLV()为true,则获取到的Array数据大小一样的数组outerObj
processOp方法:
genericUDTF.process(objToSendToUDTF);
if (conf.isOuterLV() && collector.getCounter() == 0) {
collector.collect(outerObj);
}
该方法是指UDTFOperator时,如果isOuterLV()为true时,将outerObj结果放入UDTFOperator的收集器中
这两个方法中都有一个isOuterLV()方法,查看该方法的源码:
@Explain(displayName = "UDTF Operator")
public class UDTFDesc extends AbstractOperatorDesc {
private static final long serialVersionUID = 1L;
private GenericUDTF genericUDTF;
private boolean outerLV;
public UDTFDesc() {
}
public UDTFDesc(final GenericUDTF genericUDTF, boolean outerLV) {
this.genericUDTF = genericUDTF;
this.outerLV = outerLV;
}
public GenericUDTF getGenericUDTF() {
return genericUDTF;
}
public void setGenericUDTF(final GenericUDTF genericUDTF) {
this.genericUDTF = genericUDTF;
}
@Explain(displayName = "function name")
public String getUDTFName() {
return genericUDTF.toString();
}
public boolean isOuterLV() {
return outerLV;
}
public void setOuterLV(boolean outerLV) {
this.outerLV = outerLV;
}
@Explain(displayName = "outer lateral view")
public String isOuterLateralView() {
return outerLV ? "true" : null;
}
}
其中isOuterLateralView()这个方法说明HIVE中存在outer lateral view语法。当Hql语法中有这个写法即为true;
源码分析到这个地方,整体回溯上去就能发现当时用lateral view outer语法时,表明输入的结果集与输出的结果集个数一样。则使用lateral view outer UDTF()函数时,理解上可以认为是主表与虚拟表之间进行left join。注意:其本质上本没有进行join,只是就结果而言可以这么理解。
GenericUDTFExplode源码中主要有二个方法:
//初始化
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}
ArrayList
ArrayList
switch (args[0].getCategory()) {
case LIST:
inputOI = args[0];
fieldNames.add("col");
fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
break;
case MAP:
inputOI = args[0];
fieldNames.add("key");
fieldNames.add("value");
fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
// 执行过程
public void process(Object[] o) throws HiveException {
switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector)inputOI;
List> list = listOI.getList(o[0]);
if (list == null) {
return;
}
for (Object r : list) {
forwardListObj[0] = r;
forward(forwardListObj);
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector)inputOI;
Map,?> map = mapOI.getMap(o[0]);
if (map == null) {
return;
}
for (Entry,?> r : map.entrySet()) {
forwardMapObj[0] = r.getKey();
forwardMapObj[1] = r.getValue();
forward(forwardMapObj);
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}
}
其中if (map == null) {
return;
}
则返回null;
表明mr过程中,在map端没有匹配值的时候,返回null;方便理解的话便是虚表与主表进行inner join的结果。
好文推荐
发表评论