UDF
User Define Function 用户自定义函数
X 进 一 出 (X针对的是列)
UDF1 一进一出 , UDF2 二进一出, UDF3 三进一出 。。。
实现:
继承 UDFx 函数,实现call方法
注册函数:调用 udf().register() 方法 , 传入自定义的函数名,自定义的UDFx函数对象,返回类型
sqlContext.udf().register(udfName : String, udf: UDFx , returnType: DataType)
使用:sqlContext.sql()
sqlContext.sql(" select udfName(param1, param2....) from tableName ")
例子:
获取传入字段的子符串的长度
public class UDF {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("UDF").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
List<String> list = new ArrayList<String>();
list.add("yarn");
list.add("Marry");
list.add("Jack");
list.add("To m");
list.add("Tom");
JavaRDD<String> nameRdd = sc.parallelize(list);
JavaRDD<Row> rowRdd = nameRdd.map(new Function<String, Row>() {
@Override
public Row call(String name) throws Exception {
return RowFactory.create(name);
}
});
ArrayList<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType structType = DataTypes.createStructType(structFields);
DataFrame nameDF = sqlContext.createDataFrame(rowRdd, structType);
nameDF.registerTempTable("nameTable");
/**
* 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx
*/
sqlContext.udf().register("strLen", new UDF1<String,Integer>() {
@Override
public Integer call(String str) throws Exception {
return str.length();
}
}, DataTypes.IntegerType);
/*
sqlContext.udf().register("strLen", new UDF2<String, Integer, String>() {
@Override
public String call(String t1, Integer t2) throws Exception {
Random random = new Random();
return t1.length()+random.nextInt(t2)+"~";
}
}, DataTypes.StringType);
*/
sqlContext.sql("SELECT name,strLen(name,10) from nameTable").show();
}
}
UDAF
User Defined Aggregation Function 用户自定义聚合函数
函数本身作用于数据集合,类似数据库中的聚合函数,是针对一列的所有行数据,对之实施聚合。
实现:
继承 UserDefinedAggregateFunction 类,实现其中方法
注册函数:调用 udf().register() 方法 , 传入自定义的函数名,自定义的UDAF函数对象
sqlContext.udf().register(udafName : String, udaf: UserDefinedAggregateFunction)
使用:sqlContext.sql()
sqlContext.sql(" select udafName(param) from tableName ")
例子:
统计某字段的行数
public class UDAF { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("UDAF").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(sc); List<String> list = new ArrayList<String>(); list.add("yarn"); list.add("Marry"); list.add("Jack"); list.add("Tom"); list.add("Tom"); /** * SELECT name,udaf(name) FROM nameTable GROUP BY name; */ JavaRDD<String> nameRdd = sc.parallelize(list); JavaRDD<Row> rowRdd = nameRdd.map(new Function<String, Row>() { @Override public Row call(String name) throws Exception { return RowFactory.create(name); } }); ArrayList<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(structFields); // 创建dataFrame DataFrame nameDF = sqlContext.createDataFrame(rowRdd, structType); //注册表 nameDF.registerTempTable("nameTable"); //注册函数 sqlContext.udf().register("stringCount", new UserDefinedAggregateFunction() { // 指定输入数据的字段与类型 @Override public StructType inputSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("str", DataTypes.StringType, true))); } // 聚合操作时,所处理的数据的类型 // 即 buffer中数据的schema,buffer就是一个Row @Override public StructType bufferSchema() { return DataTypes.createStructType( Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)) ); }
/** * buffer就是一个Row * buffer初始化 可以认为是,你自己在内部指定一个初始的值 */ @Override public void initialize(MutableAggregationBuffer buffer) { // 0号位置 值为0 buffer.update(0, 0); } /** * whether given the same input, * always return the same output * true: yes */ @Override public boolean deterministic() { return true; } // 最终函数返回值的类型 @Override public DataType dataType() { return DataTypes.IntegerType; } // 最终返回一个的聚合值 // 要和dataType的类型一一对应 @Override public Object evaluate(Row row) { // 返回 0号 位置值 return row.getInt(0); } /** * 更新 可以认为是,一个一个地将组内的字段值传递进来 实现拼接的逻辑 * buffer.getInt(0)获取的是上一次聚合后的值 * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 * 大聚和发生在reduce端 */ @Override public void update(MutableAggregationBuffer buffer, Row row) { // 更新0号位置值 buffer.update(0, buffer.getInt(0) + 1); } /** * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理 * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来 * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值 * buffer2.getInt(0) : 这次计算传入进来的update的结果 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)); } }); sqlContext.sql("SELECT name,stringCount(name) from nameTable group by name").show(); sc.close(); } }
开窗函数
如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用 HiveContext 来执行,HiveContext默认情况下在本地无法创建
一般 分组取TopN 时,用到开窗函数
row_number() 开窗函数的作用:
按照我们每一个分组的数据,按其照顺序,打上一个分组内的行号
语法: row_number() OVER (PARTITION BY 分组类名 ORDER BY 排序列名 DESC) 结果别名(列名)
/**
* row_number()开窗函数
* @author Jeffrey.deng
* groupTopN
*
* 如果SQL语句里面使用到了开窗函数,那么这个SQL语句必须使用HiveContext来执行,HiveContext默认情况下在本地无法创建
*/
public class RowNumberWindowFunction {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("RowNumberWindowFunction");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc.sc());
// 通过hiveContext操作hive数据库 删除已经存在的表,创建新表,并且加载数据
hiveContext.sql("DROP TABLE IF EXISTS sales");
hiveContext.sql("CREATE TABLE IF NOT EXISTS sales ("
+ "product STRING,"
+ "category STRING,"
+ "revenue BIGINT) row format delimited fields terminated by '\t'");
hiveContext.sql("LOAD DATA "
+ "LOCAL INPATH '/root/resource/sales.txt' "
+ "INTO TABLE sales");
/**
* row_number()开窗函数的作用:按照我们每一个分组的数据,按其照顺序,打上一个分组内的行号
* id=2016 [111,112,113]
* 那么对这个分组的每一行使用row_number()开窗函数后,三行数据会一次得到一个组内的行号
* id=2016 [111 1,112 2,113 3]
*/
DataFrame top3SalesDF = hiveContext.sql(""
+ "SELECT product,category,revenue "
+ "FROM ("
+ "SELECT "
+ "product,"
+ "category,"
+ "revenue,"
+ "row_number() OVER (PARTITION BY category ORDER BY revenue DESC) rank "
+ "FROM sales "
+ ") tmp_sales "
+ "WHERE rank <= 3");
// 将每组排名前3的数据,保存到一个表中
hiveContext.sql("USE result");
hiveContext.sql("DROP TABLE IF EXISTS top3Sales");
top3SalesDF.write().saveAsTable("top3Sales");
sc.close();
}
}