分组取topN:
        使用groupByKey分组,
        然后遍历每个key,
          List  Collections.sort(list) 
  有什么问题?  某一个key对应的value 有可能非常非常的多,放到list里面会有OOM的风险
        防止内存溢出,定义一个数组存储TopN,每次遍历一行,判断该行值是否大于数组内的值,大于则替换。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 获取每个月的最高温度的两条数据
 * 1949-10-01 14:21:02 34c
 * 1949-10-01 19:21:02 38c
 * 1949-10-02 14:01:02 36c
 * 1950-01-01 11:21:02 32c
 * 1950-10-01 12:21:02 37c
 *
 * Created by Jeffrey.Deng on 2017/9/14.
 */
public class WeatherTopN  implements Serializable {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("TonN");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> rdd = sc.textFile("G:\\Recv Files\\14Spark\\01-Core\\day04\\weather");
        rdd = rdd.cache();
        List<Tuple2<String, List<Tuple2<String, Double>>>> list = rdd.mapToPair(new PairFunction<String, String, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Tuple2<String, Double>> call(String log) throws Exception {
                String date = log.substring(0, 7);
                Double temperature = Double.parseDouble(log.split("\t")[1].replace("c", ""));
                // { date: [log,temperature] }
                return new Tuple2<>(date, new Tuple2<>(log, temperature));
            }
            // 根据date分组
        }).groupByKey().mapToPair(new PairFunction<Tuple2<String, Iterable<Tuple2<String, Double>>>, String, List<Tuple2<String, Double>>>() {
            @Override
            public Tuple2<String, List<Tuple2<String, Double>>> call(Tuple2<String, Iterable<Tuple2<String, Double>>> t) throws Exception {
                String date = t._1();
                Iterator<Tuple2<String, Double>> iterator = t._2().iterator();

                /**
                 *  求 TopN 的优化
                 *  定义一个数组存储TopN,每遍历一行,判断是否大于数组内的值,大于就替换
                 */
                Tuple2<String, Double>[] arr = new Tuple2[2];
                while (iterator.hasNext()) {
                    Tuple2<String, Double> next = iterator.next();
                    Double temperature = next._2();
                    for (int i = 0; i < arr.length; i++) {
                        Tuple2<String, Double> curr = arr[i];
                        if (curr == null) {  // 数组内没值,就直接保存该值
                            arr[i] = next;
                            break;
                        } else if (temperature > curr._2()) {  //如果大于数组内该位置的值
                            for (int j = arr.length - 1; j > i; j--) {  // 剩下位置的往后移动
                                arr[j] = arr[j - 1];
                            }
                            arr[i] = next;  //该位置的值设置
                            break;
                        }
                    }
                }
                return new Tuple2<>(date, Arrays.asList(arr));
            }
        }).collect();

        for (Tuple2<String, List<Tuple2<String, Double>>> stringListTuple2 : list) {
            System.out.println(stringListTuple2.toString());
        }

        sc.stop();
    }

}



二次排序:
        将 第一个字段和第二个字段  封装到 一个自定义的对象,自定义的对象应该作为Key
                    自定义的二次排序的类 必须是可序列化  的, 进行网络的传输
                    必须实现 Comparable  接口
        使用 sortByKey进行排序
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

/**
 * 数据:
 * 6 5
 * 8 123
 * 1 4
 * 4 123
 * 5 432
 * 思路:
 * 1、将第一个字段和第二个字段封装到一个自定义的对象,自定义的对象应该作为Key
 * 2、使用sortByKey进行排序
 *
 * @author Jeffrey.Deng
 */
public class SecondarySortOps {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setMaster("local")
				.setAppName("TopOps");
		JavaSparkContext sc = new JavaSparkContext(conf);
		JavaRDD<String> linesRDD = sc.textFile("secondSort.txt");

		JavaPairRDD<SecondSortKey, String> pairRDD = linesRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {

			@Override
			public Tuple2<SecondSortKey, String> call(String val) throws Exception {
				// 封装到自定义的对象
				SecondSortKey secondSortKey = new SecondSortKey(Integer.valueOf(val.split(" ")[0]), Integer.valueOf(val.split(" ")[1]));
				return new Tuple2<SecondSortKey, String>(secondSortKey, val);
			}
		});

		JavaPairRDD<SecondSortKey, String> sortByKey = pairRDD.sortByKey();
		List<Tuple2<SecondSortKey, String>> collect = sortByKey.collect();

		for (Tuple2<SecondSortKey, String> tuple2 : collect) {
			System.out.println(tuple2._2);
		}

		sc.stop();
	}
}
自定义Key类:
import java.io.Serializable;

/**
 * 1、自定义的二次排序的类 必须是可序列化的。 进行网络的传输
 * 2、必须实现Comparable接口
 * @author Jeffrey.Deng
 *
 */
public class SecondSortKey implements Serializable,Comparable<SecondSortKey> {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private Integer first;
	private Integer second;
	public Integer getFirst() {
		return first;
	}
	public void setFirst(Integer first) {
		this.first = first;
	}
	public Integer getSecond() {
		return second;
	}
	public void setSecond(Integer second) {
		this.second = second;
	}
	public SecondSortKey(Integer first, Integer second) {
		super();
		this.first = first;
		this.second = second;
	}
	@Override
	public int compareTo(SecondSortKey that) {
		if(getFirst() - that.getFirst() == 0 ){
			return getSecond() - that.getSecond();
		}else{
			return getFirst() - that.getFirst();
		}
	}
	@Override
	public String toString() {
		return "First:" + getFirst() + "\tSecond:" + getSecond();
	}
}

Scala版本:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions

object SecondSort {
  def main(args: Array[String]): Unit = {

    val sconf = new SparkConf().setAppName("SecondSort").setMaster("local")
    val sc = new SparkContext(sconf)
    val lines = sc.textFile("secondSort.txt")

    val pairs = lines.map { x => (new SecondSortKey(x.split(" ")(0).toInt, x.split(" ")(1).toInt), x) }
    val sortedPairs = pairs.sortByKey(false)
    sortedPairs.map(_._2).foreach {
      println
    }
  }
}

/**
  * 继承 Ordered ,Serializable
  *
  * @param first
  * @param second
  */
class SecondSortKey(val first: Int, val second: Int) extends Ordered[SecondSortKey] with Serializable {
  def compare(that: SecondSortKey): Int = {
    if (this.first - that.first == 0)
      this.second - that.second
    else
      this.first - that.first
  }
}

添加新评论