分组取topN:
使用groupByKey分组,
然后遍历每个key,
List Collections.sort(list)
有什么问题? 某一个key对应的value 有可能非常非常的多,放到list里面会有OOM的风险
防止内存溢出,定义一个数组存储TopN,每次遍历一行,判断该行值是否大于数组内的值,大于则替换。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* 获取每个月的最高温度的两条数据
* 1949-10-01 14:21:02 34c
* 1949-10-01 19:21:02 38c
* 1949-10-02 14:01:02 36c
* 1950-01-01 11:21:02 32c
* 1950-10-01 12:21:02 37c
*
* Created by Jeffrey.Deng on 2017/9/14.
*/
public class WeatherTopN implements Serializable {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("TonN");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd = sc.textFile("G:\\Recv Files\\14Spark\\01-Core\\day04\\weather");
rdd = rdd.cache();
List<Tuple2<String, List<Tuple2<String, Double>>>> list = rdd.mapToPair(new PairFunction<String, String, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Tuple2<String, Double>> call(String log) throws Exception {
String date = log.substring(0, 7);
Double temperature = Double.parseDouble(log.split("\t")[1].replace("c", ""));
// { date: [log,temperature] }
return new Tuple2<>(date, new Tuple2<>(log, temperature));
}
// 根据date分组
}).groupByKey().mapToPair(new PairFunction<Tuple2<String, Iterable<Tuple2<String, Double>>>, String, List<Tuple2<String, Double>>>() {
@Override
public Tuple2<String, List<Tuple2<String, Double>>> call(Tuple2<String, Iterable<Tuple2<String, Double>>> t) throws Exception {
String date = t._1();
Iterator<Tuple2<String, Double>> iterator = t._2().iterator();
/**
* 求 TopN 的优化
* 定义一个数组存储TopN,每遍历一行,判断是否大于数组内的值,大于就替换
*/
Tuple2<String, Double>[] arr = new Tuple2[2];
while (iterator.hasNext()) {
Tuple2<String, Double> next = iterator.next();
Double temperature = next._2();
for (int i = 0; i < arr.length; i++) {
Tuple2<String, Double> curr = arr[i];
if (curr == null) { // 数组内没值,就直接保存该值
arr[i] = next;
break;
} else if (temperature > curr._2()) { //如果大于数组内该位置的值
for (int j = arr.length - 1; j > i; j--) { // 剩下位置的往后移动
arr[j] = arr[j - 1];
}
arr[i] = next; //该位置的值设置
break;
}
}
}
return new Tuple2<>(date, Arrays.asList(arr));
}
}).collect();
for (Tuple2<String, List<Tuple2<String, Double>>> stringListTuple2 : list) {
System.out.println(stringListTuple2.toString());
}
sc.stop();
}
}
二次排序:
将 第一个字段和第二个字段 封装到 一个自定义的对象,自定义的对象应该作为Key
自定义的二次排序的类 必须是可序列化 的, 进行网络的传输
必须实现 Comparable 接口
使用 sortByKey进行排序
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* 数据:
* 6 5
* 8 123
* 1 4
* 4 123
* 5 432
* 思路:
* 1、将第一个字段和第二个字段封装到一个自定义的对象,自定义的对象应该作为Key
* 2、使用sortByKey进行排序
*
* @author Jeffrey.Deng
*/
public class SecondarySortOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sc.textFile("secondSort.txt");
JavaPairRDD<SecondSortKey, String> pairRDD = linesRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
@Override
public Tuple2<SecondSortKey, String> call(String val) throws Exception {
// 封装到自定义的对象
SecondSortKey secondSortKey = new SecondSortKey(Integer.valueOf(val.split(" ")[0]), Integer.valueOf(val.split(" ")[1]));
return new Tuple2<SecondSortKey, String>(secondSortKey, val);
}
});
JavaPairRDD<SecondSortKey, String> sortByKey = pairRDD.sortByKey();
List<Tuple2<SecondSortKey, String>> collect = sortByKey.collect();
for (Tuple2<SecondSortKey, String> tuple2 : collect) {
System.out.println(tuple2._2);
}
sc.stop();
}
}
自定义Key类:import java.io.Serializable;
/**
* 1、自定义的二次排序的类 必须是可序列化的。 进行网络的传输
* 2、必须实现Comparable接口
* @author Jeffrey.Deng
*
*/
public class SecondSortKey implements Serializable,Comparable<SecondSortKey> {
/**
*
*/
private static final long serialVersionUID = 1L;
private Integer first;
private Integer second;
public Integer getFirst() {
return first;
}
public void setFirst(Integer first) {
this.first = first;
}
public Integer getSecond() {
return second;
}
public void setSecond(Integer second) {
this.second = second;
}
public SecondSortKey(Integer first, Integer second) {
super();
this.first = first;
this.second = second;
}
@Override
public int compareTo(SecondSortKey that) {
if(getFirst() - that.getFirst() == 0 ){
return getSecond() - that.getSecond();
}else{
return getFirst() - that.getFirst();
}
}
@Override
public String toString() {
return "First:" + getFirst() + "\tSecond:" + getSecond();
}
}
Scala版本:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
object SecondSort {
def main(args: Array[String]): Unit = {
val sconf = new SparkConf().setAppName("SecondSort").setMaster("local")
val sc = new SparkContext(sconf)
val lines = sc.textFile("secondSort.txt")
val pairs = lines.map { x => (new SecondSortKey(x.split(" ")(0).toInt, x.split(" ")(1).toInt), x) }
val sortedPairs = pairs.sortByKey(false)
sortedPairs.map(_._2).foreach {
println
}
}
}
/**
* 继承 Ordered ,Serializable
*
* @param first
* @param second
*/
class SecondSortKey(val first: Int, val second: Int) extends Ordered[SecondSortKey] with Serializable {
def compare(that: SecondSortKey): Int = {
if (this.first - that.first == 0)
this.second - that.second
else
this.first - that.first
}
}