Hive函数
一、内置函数
 函数文档下载:Hive-LanguageManualUDF.pdf
 官方地址:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
二、自定义函数
自定义函数包括三种 UDF、UDAF、UDTF 
        UDF(User-Defined-Function) 一进一出 ,既一行进一行出
        UDAF(User- Defined Aggregation Funcation) 聚集函数,多进一出(多行进一行出)。Count/max/min 
        UDTF(User-Defined Table-Generating Functions)一进多出,如 explore() 
UDF 开发
    1、UDF 函数可以直接应用于 select 语句,对查询结构做格式化处理后,再输出内容。 
    2、编写 UDF 函数的时候需要注意一下几点: 
        a)自定义 UDF 需要继承 org.apache.hadoop.hive.ql.UDF。 
        b)需要实现 evaluate 函数,evaluate 函数支持重载。 
package udf;

import org.apache.hadoop.hive.ql.exec.UDF;

/**
 * UDF,一次处理一行的数据
 * @author Jeffrey.Deng
 * @date 2017-09-19
 */
public class FormatIntUDF extends UDF{

    public String evaluate(String str) {
        return evaluate(str, 5);
    }

    public String evaluate(String str, int length) {    //重载
        try {
            return String.format("%0"+ length +"d", Integer.valueOf(str));
        } catch (Exception e) {
            return str;
        }
    }

    public String evaluate(int num) {
        return evaluate(num, 5);
    }    //重载
    
    public String evaluate(int num, int length) {    //重载
        try {
            return String.format("%0"+ length +"d", num);
        } catch (Exception e) {
            return num + "";
        }
    }
}
    3、步骤 
        a)把程序打包放到目标机器上去(hiveserver )
        b)进入 hive 客户端,添加 jar 包:
hive> add jar udf.jar; 
                上传 jar  要上传到 hiveserver上,不要在客户端 
                Add jar path:hiveserver 主机的地址 而不是 hdfs 地址 
        c)创建临时函数:
hive> create temporary function format_int as 'udf.FormatIntUDF'; 
        d)查询 HQL 语句: 
hive> select format_int(t.col1) from t limit 10;
        e)销毁临时函数:
hive> drop temporary function format_int;
        注意
            format_int为临时的函数,所以每次进入hive都需要add jar以及create temporary操作;
            永久函数,去掉 temporary
            UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF

以上为一种简单UDF函数写法,但此种写法执行时是利用反射的,所以性能不高,
所以一般工作中,是继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF
下面是一个 split功能的 UDF函数实现 :
package udf;

import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.util.ArrayList;

/**
 * @author Jeffrey.Deng
 * @date 2017-09-24
 */
public class MySplitUDF extends GenericUDF {

    private transient ObjectInspectorConverters.Converter[] converters;

    /**
     * 初始化
     * 设置输出类型
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

        if (arguments.length != 2) {
            throw new UDFArgumentLengthException("The function SPLIT(s, regexp) takes exactly 2 arguments.");
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        for (int i = 0; i < arguments.length; i++) {
            // A converter which will convert objects with one ObjectInspector to another.
            // 创建一个string转换器将其他类型转成string类型
            converters[i] = ObjectInspectorConverters.getConverter(arguments[i], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        }
        // 返回输出类型为 泛型为字符串的List类型
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    // 一行执行一次
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if (arguments.length != 2) {
            throw new TaskExecutionException("mysplit should input two argument");
        }

        if (arguments[0].get() == null || arguments[1].get() == null) {
            return null;
        }

        Text s = (Text) converters[0].convert(arguments[0].get()); // 将其他类型转成string类型
        Text regex = (Text) converters[1].convert(arguments[1].get());

        ArrayList<Text> result = new ArrayList<>();

        for (String str : s.toString().split(regex.toString(), -1)) { // 按设置的分隔符切割
            result.add(new Text(str));
        }

        return result; // 返回数组
    }

    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("split", children);
    }

}
    重写 initialize() 与 evaluate() 方法:
            initialize() 中返回输出的类型
            evaluate() 每行执行一次,返回处理结果

UDAF 
UDAF是聚合函数,相当于reduce,将表中多行数据聚合成一行结果
UDAF是需要在hive的sql语句和group by联合使用,hive的group by对于每个分组,只能返回一条记录,这点和mysql不一样。
hive有两种UDAF:简单和通用
    顾名思义,简单的UDAF,写的相当简单的,继承UDAF就可以了,但因为使用Java反射导致性能损失,而且有些特性不能使用,所以过时了。
    通用UDAF 可以使用​​所有功能,但实现比较复杂。

开发通用 UDAF 有两个步骤:

    第一个是编写 resolver 类,第二个是编写 evaluator 类。

    resolver 负责类型检查,操作符重载,里面创建evaluator类对象

    evaluator 真正实现UDAF的逻辑。

(1)、实现 resolver 类:

resolver 通常继承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2

        但是更建议继承 AbstractGenericUDAFResolver,隔离将来hive接口的变化。

    ( AbstractGenericUDAFResolver 继承 GenericUDAFResolver 和 GenericUDAFResolver2 )

GenericUDAFResolver 和 GenericUDAFResolver2 接口的区别是:

       Resolver2 允许evaluator实现利用 GenericUDAFParameterInfo 可以访问更多的信息,例如 DISTINCT限定符,通配符(*)

AbstractGenericUDAFResolver类源码:

package org.apache.hadoop.hive.ql.udf.generic;

import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

/**
 * An abstract class to help facilitate existing implementations of
 * <tt>GenericUDAFResolver</tt> to migrate towards the newly introduced
 * interface {@link GenericUDAFResolver2}. This class provides a default
 * implementation of this new API and in turn calls
 * the existing API {@link GenericUDAFResolver#getEvaluator(TypeInfo[])} by
 * ignoring the extra parameter information available via the
 * <tt>GenericUDAFParameterInfo</tt> interface.
 *
 */
public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {

  // 返回一个GenericUDAFEvaluator的实现类对象就可以了

  @SuppressWarnings("deprecation")
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) // 继承自GenericUDAFResolver2
    throws SemanticException {

    if (info.isAllColumns()) {
      throw new SemanticException(
          "The specified syntax for UDAF invocation is invalid.");
    }

    return getEvaluator(info.getParameters());
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {  // 继承自GenericUDAFResolver
    throw new SemanticException("This UDAF does not support the deprecated getEvaluator() method.");
  }
  
}
GenericUDAFResolver 只需要重写一个方法:getEvaluator

    它根据SQL传入的参数类型,返回正确的 evaluator实现。这里最主要是实现 evaluator 的重载。
    getEvaluator(TypeInfo[] info)  继承自 GenericUDAFResolver ,只能判断参数的个数类型
    getEvaluator(GenericUDAFParameterInfo info) 继承自 GenericUDAFResolver2 ,还能判断 通配符(*) 、DISTINCT限定符

继承 AbstractGenericUDAFResolver 实现一个 count 操作的 UDAF,Resolver部分:

public class CountUDAF extends AbstractGenericUDAFResolver {

    private static final Log log = LogFactory.getLog(CountUDAF.class.getName());

    // 构建方法,传入的是函数指定的列
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
        if (params.length > 1) {
            throw new UDFArgumentLengthException("Exactly one argument is expected.");
        }
        return new CountUDAFEvaluator();
    }

    // 这个构建方法可以判输入的是参数是*号或者distinct
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        ObjectInspector[] parameters = info.getParameterObjectInspectors();
        boolean isAllColumns = false;
        if (parameters.length == 0) {
            if (!info.isAllColumns()) {
                throw new UDFArgumentException("Argument expected");
            }
            if (info.isDistinct()) {
                throw new UDFArgumentException("DISTINCT not supported with *");
            }
            isAllColumns = true;
        } else if (parameters.length != 1) {
            throw new UDFArgumentLengthException("Exactly one argument is expected.");
        }
        return new CountUDAFEvaluator(isAllColumns);
    }
}
(2)、实现evaluator
    所有evaluators必须继承抽象类 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
        子类必须实现它的一些抽象方法,实现UDAF的逻辑。
    下面为 自定义count 的 evaluator类 实现:
protected static class CountUDAFEvaluator extends GenericUDAFEvaluator {

        private boolean isAllColumns = false;

        private LongObjectInspector aggOI; // 合并结果的类型

        private LongWritable result;

        public CountUDAFEvaluator() {
        }

        public CountUDAFEvaluator(boolean isAllCols) {
            isAllColumns = isAllCols;
        }

        /**
         * 初始化
         *
         * @param m          代表此时在map-reduce哪个阶段,因为不同的阶段可能在不同的机器上执行,需要重新创建对象
         *                   partial1、partial2、final、complete
         * @param parameters partial1或complete阶段传入的parameters类型是原始输入数据的类型
         *                   partial2和final阶段(执行合并)的parameters类型是partial-aggregations(既合并返回结果的类型),此时parameters长度肯定只有1了
         * @return ObjectInspector
         * 在partial1和partial2阶段返回局部合并结果的类型,既terminatePartial的类型
         * 在complete或final阶段返回总结果的类型,既terminate的类型
         */
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // 当是combiner和reduce阶段时,获取合并结果的类型,因为需要执行merge方法
            // merge方法需要部分合并的结果类型来取得值
            if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
                aggOI = (LongObjectInspector) parameters[0];
            }
            result = new LongWritable(0); // 保存总结果
            return PrimitiveObjectInspectorFactory.writableLongObjectInspector; // 局部合并结果的类型和总合并结果的类型都是long
        }

        // 定义一个AbstractAggregationBuffer类来缓存合并值
        static class CountAgg extends AbstractAggregationBuffer {
            long value;

            // 返回类型占的字节数,long为8
            @Override
            public int estimate() {
                return JavaDataModel.PRIMITIVES2;
            }
        }

        // 创建缓存合并值的buffer
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            CountAgg countAgg = new CountAgg();
            reset(countAgg);
            return countAgg;
        }

        // 重置合并值
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((CountAgg) agg).value = 0;
        }

        // map时执行,迭代数据
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            // parameters为输入数据
            // parameters == null means the input table/split is empty
            if (parameters == null) {
                return;
            }
            if (isAllColumns) {
                ((CountAgg) agg).value++;
            } else {
                boolean countThisRow = true;
                for (Object nextParam : parameters) {
                    if (nextParam == null) {
                        countThisRow = false;
                        break;
                    }
                }
                if (countThisRow) {
                    ((CountAgg) agg).value++;
                }
            }
        }

        // 返回buffer中部分聚合结果,map结束和combiner结束执行
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        // 合并结果,combiner或reduce时执行
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                ((CountAgg) agg).value += aggOI.get(partial); // 累加部分聚合的结果
            }
        }

        // 返回buffer中总结果,reduce结束执行或者没有reduce时map结束执行
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            result.set(((CountAgg) agg).value); // 每一组执行一次(group by)
            return result; // 返回writable类型
        }
    }
    1、GenericUDAFEvaluator 有一个 嵌套枚举类 Mode
            它表示了UDAF运行在 mapreduce 的哪个阶段
            因为mapreduce不同的阶段可能在不同的机器上执行,需要重新创建UDAF对象,
            而 判断在哪个阶段 就在 GenericUDAFEvaluator.init() 方法中判断 传入的Mode
            理解Mode的含义,就可以理解了hive的UDAF的运行流程。

public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用 iterate()和terminatePartial()
     */
    PARTIAL1,
    /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:部分数据聚合
     * 将会调用 merge() 和 terminatePartial() 
     */
    PARTIAL2,
    /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用 merge()和terminate()
     */
    FINAL,
    /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
     * 将会调用 iterate()和terminate()
     */
    COMPLETE
};

        一般情况下,完整的UDAF逻辑是一个mapreduce过程,三种情况:

        A、如果有mapper和reducer:

            就会经历 PARTIAL1(mapper),FINAL(reducer),

        B、如果还有combiner:

            那就会经历 PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

        C、而有一些情况下的mapreduce,只有mapper,而没有reducer:

            所以就会只有 COMPLETE 阶段,这个阶段直接输入原始数据,出结果。

   2、其中聚合的累加结果需要创建一个类保存,继承自 AbstractAggregationBuffer,(AggregationBuffer过时)

    例子:

// 定义一个AbstractAggregationBuffer类来缓存合并值
static class CountAgg extends AbstractAggregationBuffer {
	long value;

	// 返回类型占的字节数,long为8
	@Override
	public int estimate() {
		return JavaDataModel.PRIMITIVES2;
	}
}

// 创建缓存合并值的buffer
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
	CountAgg countAgg = new CountAgg();
	reset(countAgg);
	return countAgg;
}

// 重置合并值
@Override
public void reset(AggregationBuffer agg) throws HiveException {
	((CountAgg) agg).value = 0;
}

    (3)、完整自定义count例子:

package udf;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.LongWritable;

/**
 * 简单的UDAF,写的相当简单的,但因为使用Java反射导致性能损失,而且有些特性不能使用
 * 所以一般开发通用UDAF函数
 * <p>
 * 开发通用UDAF有两个步骤:
 * 第一个是编写resolver类,第二个是编写evaluator类。
 * resolver负责类型检查,操作符重载。
 * evaluator真正实现UDAF的逻辑。
 * 通常来说,顶层UDAF类继承{@link org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2},
 * 里面编写嵌套类 evaluator 实现UDAF的逻辑。
 * <p>
 * resolver通常继承org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化。
 * GenericUDAFResolver和GenericUDAFResolver2接口的区别是:
 * 后面的允许evaluator实现利用GenericUDAFParameterInfo可以访问更多的信息,例如DISTINCT限定符,通配符(*)。
 *
 * @author Jeffrey.Deng
 * @date 2017-09-24
 */
public class CountUDAF extends AbstractGenericUDAFResolver {

    private static final Log log = LogFactory.getLog(CountUDAF.class.getName());

    // 构建方法,传入的是函数指定的列
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] params) throws SemanticException {
        if (params.length > 1) {
            throw new UDFArgumentLengthException("Exactly one argument is expected.");
        }
        return new CountUDAFEvaluator();
    }

    // 这个构建方法可以判输入的是参数是*号或者distinct
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        ObjectInspector[] parameters = info.getParameterObjectInspectors();
        boolean isAllColumns = false;
        if (parameters.length == 0) {
            if (!info.isAllColumns()) {
                throw new UDFArgumentException("Argument expected");
            }
            if (info.isDistinct()) {
                throw new UDFArgumentException("DISTINCT not supported with *");
            }
            isAllColumns = true;
        } else if (parameters.length != 1) {
            throw new UDFArgumentLengthException("Exactly one argument is expected.");
        }
        return new CountUDAFEvaluator(isAllColumns);
    }

    /**
     * GenericUDAFEvaluator类实现UDAF的逻辑
     * <p>
     * enum Mode 运行阶段枚举类
     * PARTIAL1:
     * 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     * PARTIAL2:
     * 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:部分数据聚合
     * 将会调用merge() 和 terminatePartial()
     * FINAL:
     * mapreduce的reduce阶段:从部分数据的聚合到完全聚合
     * 将会调用merge()和terminate()
     * COMPLETE:
     * 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
     * 将会调用 iterate()和terminate()
     */
    @SuppressWarnings("deprecation")
    protected static class CountUDAFEvaluator extends GenericUDAFEvaluator {

        private boolean isAllColumns = false;

        private LongObjectInspector aggOI; // 合并结果的类型

        private LongWritable result;

        public CountUDAFEvaluator() {
        }

        public CountUDAFEvaluator(boolean isAllCols) {
            isAllColumns = isAllCols;
        }

        /**
         * 初始化
         *
         * @param m          代表此时在map-reduce哪个阶段,因为不同的阶段可能在不同的机器上执行,需要重新创建对象
         *                   partial1、partial2、final、complete
         * @param parameters partial1或complete阶段传入的parameters类型是原始输入数据的类型
         *                   partial2和final阶段(执行合并)的parameters类型是partial-aggregations(既合并返回结果的类型),此时parameters长度肯定只有1了
         * @return ObjectInspector
         * 在partial1和partial2阶段返回局部合并结果的类型,既terminatePartial的类型
         * 在complete或final阶段返回总结果的类型,既terminate的类型
         */
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // 当是combiner和reduce阶段时,获取合并结果的类型,因为需要执行merge方法
            // merge方法需要部分合并的结果类型来取得值
            if (m == Mode.PARTIAL2 || m == Mode.FINAL) {
                aggOI = (LongObjectInspector) parameters[0];
            }
            result = new LongWritable(0); // 保存总结果
            return PrimitiveObjectInspectorFactory.writableLongObjectInspector; // 局部合并结果的类型和总合并结果的类型都是long
        }

        // 定义一个AbstractAggregationBuffer类来缓存合并值
        static class CountAgg extends AbstractAggregationBuffer {
            long value;

            // 返回类型占的字节数,long为8
            @Override
            public int estimate() {
                return JavaDataModel.PRIMITIVES2;
            }
        }

        // 创建缓存合并值的buffer
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            CountAgg countAgg = new CountAgg();
            reset(countAgg);
            return countAgg;
        }

        // 重置合并值
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            ((CountAgg) agg).value = 0;
        }

        // map时执行,迭代数据
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            // parameters为输入数据
            // parameters == null means the input table/split is empty
            if (parameters == null) {
                return;
            }
            if (isAllColumns) {
                ((CountAgg) agg).value++;
            } else {
                boolean countThisRow = true;
                for (Object nextParam : parameters) {
                    if (nextParam == null) {
                        countThisRow = false;
                        break;
                    }
                }
                if (countThisRow) {
                    ((CountAgg) agg).value++;
                }
            }
        }

        // 返回buffer中部分聚合结果,map结束和combiner结束执行
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        // 合并结果,combiner或reduce时执行
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                ((CountAgg) agg).value += aggOI.get(partial); // 累加部分聚合的结果
            }
        }

        // 返回buffer中总结果,reduce结束执行或者没有reduce时map结束执行
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            result.set(((CountAgg) agg).value); // 每一组执行一次(group by)
            return result; // 返回writable类型
        }
    }
}

    使用:

hive> add jar /root/2017/udf.jar
hive> create temporary function mycount as 'udf.CountUDAF'
hive> select call, mycount(*) as cn from beauty group by call order by cn desc
hive> select tag, mycount(tag) as cn from beauty lateral view explode(tags) lve_beauty as tag group by tag order by cn desc

UDTF

    UDTF(User-Defined Table-Generating Functions)用来解决 输入一行输出多行(On-to-many maping) 的需求。
    限制:
        1、No other expressions are allowed in SELECT 不能和其他字段一起使用
                SELECT pageid, explode(adid_list) AS myCol... is not supported
        2、UDTF's can't be nested 不能嵌套
                SELECT explode(explode(adid_list)) AS myCol... is not supported
        3、GROUP BY / CLUSTER BY / DISTRIBUTE BY / SORT BY is not supported
                SELECT explode(adid_list) AS myCol ... GROUP BY myCol is not supported

继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF ,实现 initializeprocessclose 三个方法。

执行过程:

    1、UDTF首先会调用 initialize 方法,此方法返回UDTF的输出行的信息(输出列个数与类型)

    2、初始化完成后,会调用 process 方法,真正的处理过程在process函数中:

        在process中,每一次 forward() 调用产生一行;

        如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到 forward() 函数。

   3、 最后 close() 方法调用,对需要清理的方法进行清理。

下面是实现一个explode函数的例子

    explode会将一个数组中每个元素都输出一行,map中每对key-value都输出一行,实现对数据展开

package udf;

import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author Jeffrey.Deng
 * @date 2017-09-25
 */
public class MyExplodeUDTF extends GenericUDTF {

    private transient ObjectInspector inputOI = null;

    /**
     * 初始化
     * 构建一个StructObjectInspector类型用于输出
     * 其中stuct的字段构成输出的一行。
     * 字段名称不重要,因为它们将被用户提供的列别名覆盖。
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 以前的initialize(ObjectInspector[] argOIs)方法失效,下面这里是新方法里获取ObjectInspector[]的写法
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs(); // 得到结构体的字段
        ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
        for (int i = 0; i < inputFields.size(); i++) {
            udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector(); // 字段类型
        }

        if (udtfInputOIs.length != 1) {
            throw new UDFArgumentLengthException("explode() takes only one argument");
        }
        List<String> fieldNames = new ArrayList<>();
        List<ObjectInspector> fieldOIs = new ArrayList<>();
        switch (udtfInputOIs[0].getCategory()) {
            case LIST:
                inputOI = udtfInputOIs[0];
                fieldNames.add("col"); // 指定list生成的列名, 可在as后覆写
                fieldOIs.add(((ListObjectInspector) inputOI).getListElementObjectInspector()); // 获取list元素的类型
                break;
            case MAP:
                inputOI = udtfInputOIs[0];
                fieldNames.add("key"); // 指定map中key的生成的列名, 可在as后覆写
                fieldNames.add("value"); // 指定map中value的生成的列名, 可在as后覆写
                fieldOIs.add(((MapObjectInspector) inputOI).getMapKeyObjectInspector()); // 得到map中key的类型
                fieldOIs.add(((MapObjectInspector) inputOI).getMapValueObjectInspector()); // 得到map中value的类型
                break;
            default:
                throw new UDFArgumentException("explode() takes an array or a map as a parameter");
        }
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); // 创建一个Struct类型返回
    }

    private transient Object[] forwardListObj = new Object[1]; // 输出list
    private transient Object[] forwardMapObj = new Object[2]; // 输出map

    // 每行执行一次,输入数据args
    // 每调用forward,输出一行
    @Override
    public void process(Object[] args) throws HiveException {
        switch (inputOI.getCategory()) {
            case LIST:
                ListObjectInspector listOI = (ListObjectInspector) inputOI;
                List<?> list = listOI.getList(args[0]);
                if (list == null) {
                    return;
                }
                for (Object o : list) { // list中每个元素输出一行
                    forwardListObj[0] = o;
                    forward(forwardListObj);
                }
                break;
            case MAP:
                MapObjectInspector mapOI = (MapObjectInspector) inputOI;
                Map<?, ?> map = mapOI.getMap(args[0]);
                if (map == null) {
                    return;
                }
                for (Map.Entry<?, ?> entry : map.entrySet()) { // map中每一对输出一行
                    forwardMapObj[0] = entry.getKey();
                    forwardMapObj[1] = entry.getValue();
                    forward(forwardMapObj);
                }
                break;
            default:
                throw new TaskExecutionException("explode() can only operate on an array or a map");
        }
    }

    @Override
    public void close() throws HiveException {

    }

}

使用:

hive> add jar /root/2017/udf.jar
hive> create temporary function myexplode as 'udf.MyExplodeUDTF'
hive> select myexplode(tags) as tag from beauty
hive> select myexplode(props) as (k,v) from beauty
hive> select tag, count(tag) as cn from beauty lateral view myexplode(tags) lve_beauty as tag group by tag order by cn desc


添加新评论