函数文档下载:Hive-LanguageManualUDF.pdf
官方地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
自定义函数包括三种 UDF、UDAF、UDTF
UDF(User-Defined-Function) 一进一出 ,既一行进一行出
UDAF(User- Defined Aggregation Funcation) 聚集函数,多进一出(多行进一行出)。Count/max/min
UDTF(User-Defined Table-Generating Functions)一进多出,如 explore()
package udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* UDF,一次处理一行的数据
* @author Jeffrey.Deng
* @date 2017-09-19
*/
public class FormatIntUDF extends UDF{
public String evaluate(String str) {
return evaluate(str, 5);
}
public String evaluate(String str, int length) { //重载
try {
return String.format("%0"+ length +"d", Integer.valueOf(str));
} catch (Exception e) {
return str;
}
}
public String evaluate(int num) {
return evaluate(num, 5);
} //重载
public String evaluate(int num, int length) { //重载
try {
return String.format("%0"+ length +"d", num);
} catch (Exception e) {
return num + "";
}
}
}
hive> add jar udf.jar;
hive> create temporary function format_int as 'udf.FormatIntUDF';
hive> select format_int(t.col1) from t limit 10;
hive> drop temporary function format_int;
以上为一种简单UDF函数写法,但此种写法执行时是利用反射的,所以性能不高,
所以一般工作中,是继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF
下面是一个 split功能的 UDF函数实现 :
package udf;
import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
import java.util.ArrayList;
/**
* @author Jeffrey.Deng
* @date 2017-09-24
*/
public class MySplitUDF extends GenericUDF {
private transient ObjectInspectorConverters.Converter[] converters;
/**
* 初始化
* 设置输出类型
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
if (arguments.length != 2) {
throw new UDFArgumentLengthException("The function SPLIT(s, regexp) takes exactly 2 arguments.");
}
converters = new ObjectInspectorConverters.Converter[arguments.length];
for (int i = 0; i < arguments.length; i++) {
// A converter which will convert objects with one ObjectInspector to another.
// 创建一个string转换器将其他类型转成string类型
converters[i] = ObjectInspectorConverters.getConverter(arguments[i], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
}
// 返回输出类型为 泛型为字符串的List类型
return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
}
// 一行执行一次
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (arguments.length != 2) {
throw new TaskExecutionException("mysplit should input two argument");
}
if (arguments[0].get() == null || arguments[1].get() == null) {
return null;
}
Text s = (Text) converters[0].convert(arguments[0].get()); // 将其他类型转成string类型
Text regex = (Text) converters[1].convert(arguments[1].get());
ArrayList<Text> result = new ArrayList<>();
for (String str : s.toString().split(regex.toString(), -1)) { // 按设置的分隔符切割
result.add(new Text(str));
}
return result; // 返回数组
}
@Override
public String getDisplayString(String[] children) {
return getStandardDisplayString("split", children);
}
}
UDAF是聚合函数,相当于reduce,将表中多行数据聚合成一行结果
UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样。
hive有两种UDAF:简单和通用。
顾名思义,简单的UDAF,写的相当简单的,继承UDAF就可以了,但因为使用Java反射导致性能损失,而且有些特性不能使用,所以过时了。
通用UDAF 可以使用所有功能,但实现比较复杂。
开发通用 UDAF 有两个步骤:
第一个是编写 resolver 类,第二个是编写 evaluator 类。
resolver 负责类型检查,操作符重载,里面创建evaluator类对象
evaluator 真正实现UDAF的逻辑。
(1)、实现 resolver 类:
resolver 通常继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,
但是更建议继承 AbstractGenericUDAFResolver,隔离将来hive接口的变化。
( AbstractGenericUDAFResolver 继承 GenericUDAFResolver 和 GenericUDAFResolver2 )
GenericUDAFResolver 和 GenericUDAFResolver2 接口的区别是:
Resolver2 允许evaluator实现利用 GenericUDAFParameterInfo 可以访问更多的信息,例如 DISTINCT限定符,通配符(*)。
AbstractGenericUDAFResolver类源码:
package org.apache.hadoop.hive.ql.udf.generic;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
* An abstract class to help facilitate existing implementations of
* <tt>GenericUDAFResolver</tt> to migrate towards the newly introduced
* interface {@link GenericUDAFResolver2}. This class provides a default
* implementation of this new API and in turn calls
* the existing API {@link GenericUDAFResolver#getEvaluator(TypeInfo[])} by
* ignoring the extra parameter information available via the
* <tt>GenericUDAFParameterInfo</tt> interface.
*
*/
public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
// 返回一个GenericUDAFEvaluator的实现类对象就可以了
@SuppressWarnings("deprecation")
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) // 继承自GenericUDAFResolver2
throws SemanticException {
if (info.isAllColumns()) {
throw new SemanticException(
"The specified syntax for UDAF invocation is invalid.");
}
return getEvaluator(info.getParameters());
}
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException { // 继承自GenericUDAFResolver
throw new SemanticException("This UDAF does not support the deprecated getEvaluator() method.");
}
}
GenericUDAFResolver 只需要重写一个方法:getEvaluator:它根据SQL传入的参数类型,返回正确的 evaluator实现。这里最主要是实现 evaluator 的重载。
getEvaluator(TypeInfo[] info) 继承自 GenericUDAFResolver ,只能判断参数的个数类型
getEvaluator(GenericUDAFParameterInfo info) 继承自 GenericUDAFResolver2 ,还能判断 通配符(*) 、DISTINCT限定符
继承 AbstractGenericUDAFResolver 实现一个 count 操作的 UDAF,Resolver部分:
public class CountUDAF extends AbstractGenericUDAFResolver {
private static final Log log = LogFactory.getLog(CountUDAF.class.getName());
// 构建方法,传入的是函数指定的列
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
if (params.length > 1) {
throw new UDFArgumentLengthException("Exactly one argument is expected.");
}
return new CountUDAFEvaluator();
}
// 这个构建方法可以判输入的是参数是*号或者distinct
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
ObjectInspector[] parameters = info.getParameterObjectInspectors();
boolean isAllColumns = false;
if (parameters.length == 0) {
if (!info.isAllColumns()) {
throw new UDFArgumentException("Argument expected");
}
if (info.isDistinct()) {
throw new UDFArgumentException("DISTINCT not supported with *");
}
isAllColumns = true;
} else if (parameters.length != 1) {
throw new UDFArgumentLengthException("Exactly one argument is expected.");
}
return new CountUDAFEvaluator(isAllColumns);
}
}
protected static class CountUDAFEvaluator extends GenericUDAFEvaluator {
private boolean isAllColumns = false;
private LongObjectInspector aggOI; // 合并结果的类型
private LongWritable result;
public CountUDAFEvaluator() {
}
public CountUDAFEvaluator(boolean isAllCols) {
isAllColumns = isAllCols;
}
/**
* 初始化
*
* @param m 代表此时在map-reduce哪个阶段,因为不同的阶段可能在不同的机器上执行,需要重新创建对象
* partial1、partial2、final、complete
* @param parameters partial1或complete阶段传入的parameters类型是原始输入数据的类型
* partial2和final阶段(执行合并)的parameters类型是partial-aggregations(既合并返回结果的类型),此时parameters长度肯定只有1了
* @return ObjectInspector
* 在partial1和partial2阶段返回局部合并结果的类型,既terminatePartial的类型
* 在complete或final阶段返回总结果的类型,既terminate的类型
*/
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
// 当是combiner和reduce阶段时,获取合并结果的类型,因为需要执行merge方法
// merge方法需要部分合并的结果类型来取得值
if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
aggOI = (LongObjectInspector) parameters[0];
}
result = new LongWritable(0); // 保存总结果
return PrimitiveObjectInspectorFactory.writableLongObjectInspector; // 局部合并结果的类型和总合并结果的类型都是long
}
// 定义一个AbstractAggregationBuffer类来缓存合并值
static class CountAgg extends AbstractAggregationBuffer {
long value;
// 返回类型占的字节数,long为8
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES2;
}
}
// 创建缓存合并值的buffer
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
CountAgg countAgg = new CountAgg();
reset(countAgg);
return countAgg;
}
// 重置合并值
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((CountAgg) agg).value = 0;
}
// map时执行,迭代数据
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// parameters为输入数据
// parameters == null means the input table/split is empty
if (parameters == null) {
return;
}
if (isAllColumns) {
((CountAgg) agg).value++;
} else {
boolean countThisRow = true;
for (Object nextParam : parameters) {
if (nextParam == null) {
countThisRow = false;
break;
}
}
if (countThisRow) {
((CountAgg) agg).value++;
}
}
}
// 返回buffer中部分聚合结果,map结束和combiner结束执行
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
// 合并结果,combiner或reduce时执行
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
((CountAgg) agg).value += aggOI.get(partial); // 累加部分聚合的结果
}
}
// 返回buffer中总结果,reduce结束执行或者没有reduce时map结束执行
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
result.set(((CountAgg) agg).value); // 每一组执行一次(group by)
return result; // 返回writable类型
}
}
public static enum Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用 iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:部分数据聚合
* 将会调用 merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用 merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};
一般情况下,完整的UDAF逻辑是一个mapreduce过程,三种情况:
A、如果有mapper和reducer:
就会经历 PARTIAL1(mapper),FINAL(reducer),
B、如果还有combiner:
那就会经历 PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
C、而有一些情况下的mapreduce,只有mapper,而没有reducer:
所以就会只有 COMPLETE 阶段,这个阶段直接输入原始数据,出结果。
2、其中聚合的累加结果需要创建一个类保存,继承自 AbstractAggregationBuffer,(AggregationBuffer过时)
例子:
// 定义一个AbstractAggregationBuffer类来缓存合并值
static class CountAgg extends AbstractAggregationBuffer {
long value;
// 返回类型占的字节数,long为8
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES2;
}
}
// 创建缓存合并值的buffer
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
CountAgg countAgg = new CountAgg();
reset(countAgg);
return countAgg;
}
// 重置合并值
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((CountAgg) agg).value = 0;
}
(3)、完整自定义count例子:
package udf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.LongWritable;
/**
* 简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用
* 所以一般开发通用UDAF函数
* <p>
* 开发通用UDAF有两个步骤:
* 第一个是编写resolver类,第二个是编写evaluator类。
* resolver负责类型检查,操作符重载。
* evaluator真正实现UDAF的逻辑。
* 通常来说,顶层UDAF类继承{@link org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2},
* 里面编写嵌套类 evaluator 实现UDAF的逻辑。
* <p>
* resolver通常继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化。
* GenericUDAFResolver和GenericUDAFResolver2接口的区别是:
* 后面的允许evaluator实现利用GenericUDAFParameterInfo可以访问更多的信息,例如DISTINCT限定符,通配符(*)。
*
* @author Jeffrey.Deng
* @date 2017-09-24
*/
public class CountUDAF extends AbstractGenericUDAFResolver {
private static final Log log = LogFactory.getLog(CountUDAF.class.getName());
// 构建方法,传入的是函数指定的列
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
if (params.length > 1) {
throw new UDFArgumentLengthException("Exactly one argument is expected.");
}
return new CountUDAFEvaluator();
}
// 这个构建方法可以判输入的是参数是*号或者distinct
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
ObjectInspector[] parameters = info.getParameterObjectInspectors();
boolean isAllColumns = false;
if (parameters.length == 0) {
if (!info.isAllColumns()) {
throw new UDFArgumentException("Argument expected");
}
if (info.isDistinct()) {
throw new UDFArgumentException("DISTINCT not supported with *");
}
isAllColumns = true;
} else if (parameters.length != 1) {
throw new UDFArgumentLengthException("Exactly one argument is expected.");
}
return new CountUDAFEvaluator(isAllColumns);
}
/**
* GenericUDAFEvaluator类实现UDAF的逻辑
* <p>
* enum Mode 运行阶段枚举类
* PARTIAL1:
* 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
* PARTIAL2:
* 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:部分数据聚合
* 将会调用merge() 和 terminatePartial()
* FINAL:
* mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
* COMPLETE:
* 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
@SuppressWarnings("deprecation")
protected static class CountUDAFEvaluator extends GenericUDAFEvaluator {
private boolean isAllColumns = false;
private LongObjectInspector aggOI; // 合并结果的类型
private LongWritable result;
public CountUDAFEvaluator() {
}
public CountUDAFEvaluator(boolean isAllCols) {
isAllColumns = isAllCols;
}
/**
* 初始化
*
* @param m 代表此时在map-reduce哪个阶段,因为不同的阶段可能在不同的机器上执行,需要重新创建对象
* partial1、partial2、final、complete
* @param parameters partial1或complete阶段传入的parameters类型是原始输入数据的类型
* partial2和final阶段(执行合并)的parameters类型是partial-aggregations(既合并返回结果的类型),此时parameters长度肯定只有1了
* @return ObjectInspector
* 在partial1和partial2阶段返回局部合并结果的类型,既terminatePartial的类型
* 在complete或final阶段返回总结果的类型,既terminate的类型
*/
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
// 当是combiner和reduce阶段时,获取合并结果的类型,因为需要执行merge方法
// merge方法需要部分合并的结果类型来取得值
if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
aggOI = (LongObjectInspector) parameters[0];
}
result = new LongWritable(0); // 保存总结果
return PrimitiveObjectInspectorFactory.writableLongObjectInspector; // 局部合并结果的类型和总合并结果的类型都是long
}
// 定义一个AbstractAggregationBuffer类来缓存合并值
static class CountAgg extends AbstractAggregationBuffer {
long value;
// 返回类型占的字节数,long为8
@Override
public int estimate() {
return JavaDataModel.PRIMITIVES2;
}
}
// 创建缓存合并值的buffer
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
CountAgg countAgg = new CountAgg();
reset(countAgg);
return countAgg;
}
// 重置合并值
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((CountAgg) agg).value = 0;
}
// map时执行,迭代数据
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
// parameters为输入数据
// parameters == null means the input table/split is empty
if (parameters == null) {
return;
}
if (isAllColumns) {
((CountAgg) agg).value++;
} else {
boolean countThisRow = true;
for (Object nextParam : parameters) {
if (nextParam == null) {
countThisRow = false;
break;
}
}
if (countThisRow) {
((CountAgg) agg).value++;
}
}
}
// 返回buffer中部分聚合结果,map结束和combiner结束执行
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return terminate(agg);
}
// 合并结果,combiner或reduce时执行
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
((CountAgg) agg).value += aggOI.get(partial); // 累加部分聚合的结果
}
}
// 返回buffer中总结果,reduce结束执行或者没有reduce时map结束执行
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
result.set(((CountAgg) agg).value); // 每一组执行一次(group by)
return result; // 返回writable类型
}
}
}
使用:
hive> add jar /root/2017/udf.jar
hive> create temporary function mycount as 'udf.CountUDAF'
hive> select call, mycount(*) as cn from beauty group by call order by cn desc
hive> select tag, mycount(tag) as cn from beauty lateral view explode(tags) lve_beauty as tag group by tag order by cn desc
UDTF
UDTF(User-Defined Table-Generating Functions)用来解决 输入一行输出多行(On-to-many maping) 的需求。
限制:
1、No other expressions are allowed in SELECT 不能和其他字段一起使用
SELECT pageid, explode(adid_list) AS myCol... is not supported
2、UDTF's can't be nested 不能嵌套
SELECT explode(explode(adid_list)) AS myCol... is not supported
3、GROUP BY / CLUSTER BY / DISTRIBUTE BY / SORT BY is not supported
SELECT explode(adid_list) AS myCol ... GROUP BY myCol is not supported
继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF ,实现 initialize, process, close 三个方法。
执行过程:
1、UDTF首先会调用 initialize 方法,此方法返回UDTF的输出行的信息(输出列个数与类型)
2、初始化完成后,会调用 process 方法,真正的处理过程在process函数中:
在process中,每一次 forward() 调用产生一行;
如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到 forward() 函数。
3、 最后 close() 方法调用,对需要清理的方法进行清理。
下面是实现一个explode函数的例子:
explode会将一个数组中每个元素都输出一行,map中每对key-value都输出一行,实现对数据展开
package udf;
import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author Jeffrey.Deng
* @date 2017-09-25
*/
public class MyExplodeUDTF extends GenericUDTF {
private transient ObjectInspector inputOI = null;
/**
* 初始化
* 构建一个StructObjectInspector类型用于输出
* 其中stuct的字段构成输出的一行。
* 字段名称不重要,因为它们将被用户提供的列别名覆盖。
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 以前的initialize(ObjectInspector[] argOIs)方法失效,下面这里是新方法里获取ObjectInspector[]的写法
List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs(); // 得到结构体的字段
ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
for (int i = 0; i < inputFields.size(); i++) {
udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector(); // 字段类型
}
if (udtfInputOIs.length != 1) {
throw new UDFArgumentLengthException("explode() takes only one argument");
}
List<String> fieldNames = new ArrayList<>();
List<ObjectInspector> fieldOIs = new ArrayList<>();
switch (udtfInputOIs[0].getCategory()) {
case LIST:
inputOI = udtfInputOIs[0];
fieldNames.add("col"); // 指定list生成的列名, 可在as后覆写
fieldOIs.add(((ListObjectInspector) inputOI).getListElementObjectInspector()); // 获取list元素的类型
break;
case MAP:
inputOI = udtfInputOIs[0];
fieldNames.add("key"); // 指定map中key的生成的列名, 可在as后覆写
fieldNames.add("value"); // 指定map中value的生成的列名, 可在as后覆写
fieldOIs.add(((MapObjectInspector) inputOI).getMapKeyObjectInspector()); // 得到map中key的类型
fieldOIs.add(((MapObjectInspector) inputOI).getMapValueObjectInspector()); // 得到map中value的类型
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); // 创建一个Struct类型返回
}
private transient Object[] forwardListObj = new Object[1]; // 输出list
private transient Object[] forwardMapObj = new Object[2]; // 输出map
// 每行执行一次,输入数据args
// 每调用forward,输出一行
@Override
public void process(Object[] args) throws HiveException {
switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector) inputOI;
List<?> list = listOI.getList(args[0]);
if (list == null) {
return;
}
for (Object o : list) { // list中每个元素输出一行
forwardListObj[0] = o;
forward(forwardListObj);
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector) inputOI;
Map<?, ?> map = mapOI.getMap(args[0]);
if (map == null) {
return;
}
for (Map.Entry<?, ?> entry : map.entrySet()) { // map中每一对输出一行
forwardMapObj[0] = entry.getKey();
forwardMapObj[1] = entry.getValue();
forward(forwardMapObj);
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}
}
@Override
public void close() throws HiveException {
}
}
使用:
hive> add jar /root/2017/udf.jar
hive> create temporary function myexplode as 'udf.MyExplodeUDTF'
hive> select myexplode(tags) as tag from beauty
hive> select myexplode(props) as (k,v) from beauty
hive> select tag, count(tag) as cn from beauty lateral view myexplode(tags) lve_beauty as tag group by tag order by cn desc