SparkSQL UDF、UDAF、开窗函数

UDF 

    User Define Function 用户自定义函数

    X 进 一 出 (X针对的是列)

            UDF1 一进一出 , UDF2 二进一出, UDF3 三进一出 。。。

    实现:

            继承 UDFx 函数,实现call方法

            注册函数:调用 udf().register() 方法 , 传入自定义的函数名,自定义的UDFx函数对象,返回类型

                sqlContext.udf().register(udfName : String, udf: UDFx , returnType: DataType)

            使用:sqlContext.sql()

                sqlContext.sql(" select udfName(param1, param2....) from tableName ")

    例子:

            获取传入字段的子符串的长度

public class UDF {
	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("UDF").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		SQLContext sqlContext = new SQLContext(sc);
		
		List<String> list = new ArrayList<String>();
		list.add("yarn");
		list.add("Marry");
		list.add("Jack");
		list.add("To	m");
		list.add("Tom");
		JavaRDD<String> nameRdd = sc.parallelize(list);

		JavaRDD<Row> rowRdd = nameRdd.map(new Function<String, Row>() {

			@Override
			public Row call(String name) throws Exception {
				return RowFactory.create(name);
			}
		});
		ArrayList<StructField> structFields = new ArrayList<StructField>();
		structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
		StructType structType = DataTypes.createStructType(structFields);
		
		DataFrame nameDF = sqlContext.createDataFrame(rowRdd, structType);
		
		nameDF.registerTempTable("nameTable");
		
		 /**
		  * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
		  */

		sqlContext.udf().register("strLen", new UDF1<String,Integer>() {
			@Override
			public Integer call(String str) throws Exception {
				return str.length();
			}
		}, DataTypes.IntegerType);

		/*
		sqlContext.udf().register("strLen", new UDF2<String, Integer, String>() {

			@Override
			public String call(String t1, Integer t2) throws Exception {
				Random random = new Random();
				return t1.length()+random.nextInt(t2)+"~";
			}
		}, DataTypes.StringType);
		 */
		
		sqlContext.sql("SELECT name,strLen(name,10) from nameTable").show();
		
	}
}


UDAF

    User Defined Aggregation Function 用户自定义聚合函数

    函数本身作用于数据集合,类似数据库中的聚合函数,是针对一列的所有行数据,对之实施聚合。

    实现:

            继承 UserDefinedAggregateFunction 类,实现其中方法

            注册函数:调用 udf().register() 方法 , 传入自定义的函数名,自定义的UDAF函数对象

                sqlContext.udf().register(udafName : String, udaf: UserDefinedAggregateFunction)

            使用:sqlContext.sql()

                sqlContext.sql(" select udafName(param) from tableName ")

    例子:

            统计某字段的行数

public class UDAF {
	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("UDAF").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		SQLContext sqlContext = new SQLContext(sc);

		List<String> list = new ArrayList<String>();
		list.add("yarn");
		list.add("Marry");
		list.add("Jack");
		list.add("Tom");
		list.add("Tom");

		/**
		 * SELECT name,udaf(name)  FROM nameTable GROUP BY name;
		 */
		JavaRDD<String> nameRdd = sc.parallelize(list);
		JavaRDD<Row> rowRdd = nameRdd.map(new Function<String, Row>() {

			@Override
			public Row call(String name) throws Exception {
				return RowFactory.create(name);
			}
		});
		ArrayList<StructField> structFields = new ArrayList<StructField>();
		structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
		StructType structType = DataTypes.createStructType(structFields);
		// 创建dataFrame
		DataFrame nameDF = sqlContext.createDataFrame(rowRdd, structType);

		//注册表
		nameDF.registerTempTable("nameTable");

		//注册函数
		sqlContext.udf().register("stringCount", new UserDefinedAggregateFunction() {

			// 指定输入数据的字段与类型
			@Override
			public StructType inputSchema() {
				return DataTypes.createStructType(
						Arrays.asList(DataTypes.createStructField("str", DataTypes.StringType, true)));
			}

			// 聚合操作时,所处理的数据的类型 
			// 即 buffer中数据的schema,buffer就是一个Row
			@Override
			public StructType bufferSchema() {
				return DataTypes.createStructType(
						Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)) );
			}

			/**
			 * buffer就是一个Row
			 * buffer初始化 可以认为是,你自己在内部指定一个初始的值
			 */
			@Override
			public void initialize(MutableAggregationBuffer buffer) {
				// 0号位置 值为0
				buffer.update(0, 0);
			}

			/**
			 * whether given the same input,
			 * always return the same output
			 * true: yes
			 */
			@Override
			public boolean deterministic() {
				return true;
			}

			// 最终函数返回值的类型
			@Override
			public DataType dataType() {
				return DataTypes.IntegerType;
			}

			// 最终返回一个的聚合值
			// 要和dataType的类型一一对应
			@Override
			public Object evaluate(Row row) {
				// 返回 0号 位置值
				return row.getInt(0);
			}

			/**
			 * 更新 可以认为是,一个一个地将组内的字段值传递进来 实现拼接的逻辑
			 * buffer.getInt(0)获取的是上一次聚合后的值
			 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 
			 *    大聚和发生在reduce端
			 */
			@Override
			public void update(MutableAggregationBuffer buffer, Row row) {
				// 更新0号位置值
				buffer.update(0, buffer.getInt(0) + 1);
			}

			/**
			 * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
			 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
			 * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       
			 * buffer2.getInt(0) : 这次计算传入进来的update的结果
			 */
			@Override
			public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
				buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
			}

		});

		sqlContext.sql("SELECT name,stringCount(name) from nameTable group by name").show();

		sc.close();

	}
}


开窗函数

    如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用 HiveContext 来执行,HiveContext默认情况下在本地无法创建 

    一般 分组取TopN 时,用到开窗函数

    row_number()  开窗函数的作用:

              按照我们每一个分组的数据,按其照顺序,打上一个分组内的行号

              语法:  row_number() OVER (PARTITION BY 分组类名 ORDER BY 排序列名 DESC)  结果别名(列名)

/**
 * row_number()开窗函数
 * @author Jeffrey.deng
 * groupTopN
 * 
 * 如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建
 */
public class RowNumberWindowFunction {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("RowNumberWindowFunction");
		JavaSparkContext sc = new JavaSparkContext(conf);
		HiveContext hiveContext = new HiveContext(sc.sc());
		
		// 通过hiveContext操作hive数据库 删除已经存在的表,创建新表,并且加载数据
		hiveContext.sql("DROP TABLE IF EXISTS sales");
		hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
				+ "product STRING,"
				+ "category STRING,"
				+ "revenue BIGINT) row format delimited fields terminated by '\t'");  
		hiveContext.sql("LOAD DATA "
				+ "LOCAL INPATH '/root/resource/sales.txt' "
				+ "INTO TABLE sales");
		/**
		 * row_number()开窗函数的作用:按照我们每一个分组的数据,按其照顺序,打上一个分组内的行号
		 * id=2016 [111,112,113]
		 * 那么对这个分组的每一行使用row_number()开窗函数后,三行数据会一次得到一个组内的行号
		 * id=2016 [111 1,112 2,113 3]
		 */
		
		DataFrame top3SalesDF = hiveContext.sql(""
				+ "SELECT product,category,revenue "
				+ "FROM ("
					+ "SELECT "
						+ "product,"
						+ "category,"
						+ "revenue,"
						+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
					+ "FROM sales "  
				+ ") tmp_sales "
				+ "WHERE rank <= 3");
		
		// 将每组排名前3的数据,保存到一个表中
		hiveContext.sql("USE result");
		hiveContext.sql("DROP TABLE IF EXISTS top3Sales");  
		top3SalesDF.write().saveAsTable("top3Sales");
		sc.close();
	}
}


添加新评论