Join

        对两个RDD做连接,Key相同合并,合并对应的两个value值封装在二元组里

        底层由 cogroup 实现

参数:    join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

类型:

        join 内连接

        fullOuterJoin 全外连接

        leftOuterJoin 做外连接

        rightOuterJoin 右外连接

源码:

  /**
   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
   * (k, v2) is in `other`. Performs a hash join across the cluster.
   */
  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
    join(other, new HashPartitioner(numPartitions))
  }

  /**
   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
   */
  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
  }

  /**
   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
   * list of values for that key in `this` as well as `other`.
   */
  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
      : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
    }
  }

例子:

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val sqlContext = new SQLContext(sc)

    /**
      * id      name
      * 1       zhangsan
      * 2       lisi
      * 3       wangwu
      */
    val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))

    /**
      * id      age
      * 1       30
      * 2       29
      * 4       21
      */
    val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))

    /** *******************************RDD **********************************/

    println("*********************************RDD**********************************")

    println("\n内关联(inner join)\n")
    // 内关联(inner join)
    //  只保留两边id相等的部分
    /**
      * (1,(zhangsan,30))
      * (2,(lisi,29))
      */
    idName.join(idAge).collect().foreach(println)

    println("\n左外关联(left out join)\n")
    // 左外关联(left out join)
    // 以左边的数据为标准, 左边的数据一律保留
    // 右边分三情况:
    //      一: 左边的id, 右边有, 则合并数据; (1,(zhangsan,Some(30)))
    //      二: 左边的id, 右边没有, 则右边为空; (3,(wangwu,None))
    //      三: 右边的id, 左边没有, 则不保留; 右边有id为4的行, 但结果中并未保留
    /**
      * (1,(zhangsan,Some(30)))
      * (2,(lisi,Some(29)))
      * (3,(wangwu,None))
      */
    idName.leftOuterJoin(idAge).collect().foreach(println)

    println("\n右外关联(right outer join)\n")
    // 右外关联(right outer join)
    // 以右边的数据为标准, 右边的数据一律保留
    // 左边分三种情况:
    //      一: 右边的id, 左边有, 则合并数据; (1,(Some(zhangsan),30))
    //      二: 右边的id, 左边没有, 则左边为空; (4,(None,21))
    //      三: 左边的id, 右边没有, 则不保留; 左边有id为3的行, 但结果中并为保留
    /**
      * (1,(Some(zhangsan),30))
      * (2,(Some(lisi),29))
      * (4,(None,21))
      */
    idName.rightOuterJoin(idAge).collect().foreach(println)

    println("\n全外关联(full outer join)\n")
    // 全外关联(full outer join)
    /**
      *
      * (1,(Some(zhangsan),Some(30)))
      * (2,(Some(lisi),Some(29)))
      * (3,(Some(wangwu),None))
      * (4,(None,Some(21)))
      */
    idName.fullOuterJoin(idAge).collect().foreach(println)

    /** *******************************DataFrame **********************************/
    val schema1 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("name", DataTypes.StringType, nullable = true)))
    val idNameDF = sqlContext.createDataFrame(idName.map(t => Row(t._1, t._2)), schema1)

    val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true)))
    val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2)
    println("*********************************DataFrame**********************************")

    println("\n内关联(inner join)\n")
    // 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println)
    // 这里只是调用了封装的API
    idNameDF.join(idAgeDF, "id").collect().foreach(println)

    println("\n左外关联(left out join)\n")
    idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println)

    println("\n右外关联(right outer join)\n")
    idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println)

    println("\n全外关联(full outer join)\n")
    idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println)

    println("\nleft semi join\n")
    // left semi join
    // 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的
    /**
      * [1,zhangsan]
      * [2,lisi]
      */
    idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println)
  }

}

cogroup 与 join 区别

    当出现相同Key时, join会出现笛卡尔积, 而cogroup的处理方式不同

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Run {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val sqlContext = new SQLContext(sc)

    /**
      * id      name
      * 1       zhangsan
      * 2       lisi
      * 3       wangwu
      */
    val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))

    /**
      * id      age
      * 1       30
      * 2       29
      * 4       21
      */
    val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))

    println("\ncogroup\n")

    /**
      * (1,(CompactBuffer(zhangsan),CompactBuffer(30)))
      * (2,(CompactBuffer(lisi),CompactBuffer(29)))
      * (3,(CompactBuffer(wangwu),CompactBuffer()))
      * (4,(CompactBuffer(),CompactBuffer(21)))
      */
    idName.cogroup(idAge).collect().foreach(println)

    println("\njoin\n")
    // fullOuterJoin于cogroup的结果类似, 只是数据结构不一样
    /**
      * (1,(Some(zhangsan),Some(30)))
      * (2,(Some(lisi),Some(29)))
      * (3,(Some(wangwu),None))
      * (4,(None,Some(21)))
      */
    idName.fullOuterJoin(idAge).collect().foreach(println)

    /**
      * id      score
      * 1       100
      * 2       90
      * 2       95
      */
    val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95)))

    println("\ncogroup, 出现相同id时\n")

    /**
      * (1,(CompactBuffer(zhangsan),CompactBuffer(100)))
      * (2,(CompactBuffer(lisi),CompactBuffer(90, 95)))
      * (3,(CompactBuffer(wangwu),CompactBuffer()))
      */
    idName.cogroup(idScore).collect().foreach(println)

    println("\njoin, 出现相同id时\n")

    /**
      * (1,(Some(zhangsan),Some(100)))
      * (2,(Some(lisi),Some(90)))
      * (2,(Some(lisi),Some(95)))
      * (3,(Some(wangwu),None))
      */
    idName.fullOuterJoin(idScore).collect().foreach(println)
  }

}


mapPartitions

        map:遍历单位为一条记录

        mapPartitions:遍历单位为一个分区,常用于 对 数据库批量写 ,避免频繁创建数据库连接

        注意点

                    每次遍历会把一个partition的数据加载到内存里面去,当一个partition 数据量很大 时,有可能会使节点 内存溢出

                    这时就要 增加分区数,来减小一个分区的数据量。

        参数:

        mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

                    preservesPartitioning : 是否保留父RDD分区信息

源码

  /**
   * Return a new RDD by applying a function to each partition of this RDD.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

例子:

public class MapPartitionsOperator {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("JoinOperator").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		// 准备一下数据
		List<String> names = Arrays.asList("xurunyun","liangyongqi","wangfei");
		JavaRDD<String> nameRDD = sc.parallelize(names);
		
		// mapPartitions
		// map算子,一次就处理一个partition的一条数据!!!
		// mapPartitions算子,一次处理一个partition中所有的数据!!!
		
		// 推荐的使用场景!!!
		// 如果你的RDD的数据不是特别多,那么采用MapPartitions算子代替map算子,可以加快处理速度
		// 比如说100亿条数据 10分区,你一个partition里面就有10亿条数据,不建议使用mapPartitions,
		// 内存溢出
		nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {

			@Override
			public Iterable<String> call(Iterator<String> iterator) throws Exception {
				List<String> list = new ArrayList<>();
				System.out.println("创建数据库连接");
				while (iterator.hasNext()) {
					String string = iterator.next();
					System.out.println("拼接sql语句:"+string);
					list.add(string);
				}
				System.out.println("批量插入到数据库中");
				return list;
			}
		}).count();
		
		sc.close();
	}
}


mapPartitionWithIndex

        与 mapPartitions 相同,只是遍历分区时,同时会传入分区的分区号。

        参数:

        mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

                preservesPartitioning : 是否保留父RDD分区信息

源码:

 /**
   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
   * of the original partition.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }

例子:

public class MapPartitonsWithIndexOperator {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("MapPartitonsWithIndexOperator").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);

		// 准备一下数据
		List<String> names = Arrays.asList("xurunyun", "liangyongqi", "wangfei");
		// 集群模式  每台服务器 会有资源 --> CPU CORE 、 Memory
		// 一般情况下一台机器里面有一个executor进程来运行spark的task
		// 一台机器的CPU CORE核的数量就分配给一个executor来使用
		// 并行度设置为多少合适呢?理论上就是 机器的数量N 乘以 每台机器的CPU 核数
		// 如果一台机器上面跑了好几个executor进程 并行度理论上等于
		// 机器的数量N * 每个executor分配的的CPU 核数 * n个 executor
		// 实际上企业里面去设置都是这个理论值的2~3倍!!!
		JavaRDD<String> nameRDD = sc.parallelize(names,2);
		
		JavaRDD<String> mapPartitionsWithIndex = nameRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(Integer partitionId, Iterator<String> iterator) throws Exception {
				System.out.println("partitionId:"+partitionId);
				while (iterator.hasNext()) {
					System.out.println(iterator.next());
				}
				return Arrays.asList("aaa").iterator();
			}
		}, true);
		List<String> collect = mapPartitionsWithIndex.collect();
		sc.stop();
	}
}


foreachPartitions

        与 mapPartitions 相同,只是 mapPartitions 返回一个RDD,为transformation算子

        foreachPartitions 没有返回值。为action算子。

        foreach 算子:一次遍历一条数据

foreachPartitionsWithIndex

        同 mapPartitionWithIndex



Coalesce 

        重分区算子,增加或减少分区

源码

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
  }

参数: coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]

        numPartitions: 分区数 , shuffle是否在分区时产生shuffle

分情况:

        如果 numPartitions 大于当前的分区数

                而 shuffle==false,那么分区数 并不会增加

                        因为既然不产生shuffle,那么就无法增加分区。
               所以要 增加分区数,那么 shuffle==true

       如果 numPartitions 小于当前的分区数

                而 shuffle==false ,此时一个partition 的数据只会被分到一个分区,不会到多个,跟窄依赖 一样。

                        这样才不会产生shuffle

                而 shuffle==true,此时一个 partition 的数据就会 打散 到几个分区

使用场景

// coalesce算子,功能是将RDD的partition的数量缩减,减少!!!
// 将一定的数据压缩到更少的partition分区中去
// 使用场景!很多时候在filter算子应用之后会优化一下使用coalesce算子
// filter 算子应用到RDD上面,说白了会应用到RDD对应的里面的每个partition上去
// 数据倾斜,换句话说就是有可能有的partition里面就剩下了一条数据!
// 建议使用coalesce算子,从而让各个partition中的数据都更加的紧凑!!

例子

public class CoalesceOperator {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("CoalesceOperator").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		// 公司原先有6个部门
		// 但是呢,可不巧,碰到了公司的裁员!裁员以后呢,有的部门中的人员就没有了!
		// 数据倾斜,不同的部门人员不均匀,做一个部门的整合的操作,将不同的部门的员工进行压缩!
		List<String> staffList = Arrays.asList("xuruyun1","xuruyun2","xuruyun3"
				,"xuruyun4","xuruyun5","xuruyun6"
				,"xuruyun7","xuruyun8","xuruyun9"
				,"xuruyun10","xuruyun11","xuruyun12"
				,"xuruyun13","xuruyun14","xuruyun15"
				,"xuruyun16","xuruyun17","xuruyun18"
				,"xuruyun19","xuruyun20","xuruyun21"
				,"xuruyun22","xuruyun23","xuruyun24");
		
		JavaRDD<String> staffRDD = sc.parallelize(staffList, 6);
		JavaRDD<String> staffRDD2 = staffRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
			
			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(Integer index, Iterator<String> iterator)
					throws Exception {
				List<String> list = new ArrayList<String>();
				while(iterator.hasNext()){
					String staff = iterator.next();
					list.add("部门["+(index)+"]"+staff);
				}
				return list.iterator();
			}
		}, true);
				
		for(String staffInfo : staffRDD2.collect()){
			System.out.println(staffInfo);
		}
		 
 	 	JavaRDD<String> staffRDD3 = staffRDD2.coalesce(3,true);
		//JavaRDD<String> staffRDD3 = staffRDD2.repartition(3);
		
 	 	System.out.println("staffRDD3.partitions().size():"+staffRDD3.partitions().size());
		
		JavaRDD<String> staffRDD4 = staffRDD3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Iterator<String> call(Integer index, Iterator<String> iterator)
					throws Exception {
				List<String> list = new ArrayList<String>();
				while(iterator.hasNext()){
					String staff = iterator.next();
					list.add("部门["+(index)+"]"+staff);
				}
				return list.iterator();
			}
		}, true);
		for(String staffInfo : staffRDD4.collect()){
			System.out.println(staffInfo);
		}  
	}
}

repartition

        底层由coalesce实现,重分区并由shuffle。 

        repartition(numPartitions: Int)  =  coalesce(numPartitions, shuffle = true)

源码:

  /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

使用场景

// 一个很经典的场景,使用Spark SQL从HIVE中查询数据时候,spark SQL会根据HIVE
// 对应的hdfs文件的block的数量决定加载出来的RDD的partition有多少个!
// 这里默认的partition的数量是我们根本无法设置的

// 有些时候,可能它会自动设置的partition的数量过于少了,为了进行优化
// 可以提高并行度,就是对RDD使用repartition算子!


repartitionAndSortWithinPartitions

        重分区,并指定分区规则,与分区内排序规则

参数: repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])

        partitioner:分区器:数据的分区号产生算法 comp:每个分区内数据排序比较器

源码

  /**
   * Repartition the RDD according to the given partitioner and, within each resulting partition,
   * sort records by their keys.
   *
   * This is more efficient than calling `repartition` and then sorting within each partition
   * because it can push the sorting down into the shuffle machinery.
   */
  def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])
    : JavaPairRDD[K, V] = {
    implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
    fromRDD(
      new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner))
  }

例子:

public class RepartitionAndSortWithinPartitionsOperator {
	public static void main(String[] args) {
		SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(sparkConf);
		
		List<Tuple2<Integer,Integer>> list = Arrays.asList(
				new Tuple2<Integer,Integer>(2, 3),
				new Tuple2<Integer,Integer>(1, 2),
				new Tuple2<Integer,Integer>(6, 7),
				new Tuple2<Integer,Integer>(3, 4),
				new Tuple2<Integer,Integer>(5, 6),
				new Tuple2<Integer,Integer>(4, 5)
				);
		JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
		 
	
		JavaPairRDD<Integer, Integer>  rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {

			/**
			 * 设置分区数据
			 * 0 1 2
			 * @return
			 */
			@Override
			public int numPartitions() {
				return 3;
			}
			
			@Override
			public int getPartition(Object key) {
				return Integer.valueOf(key+"") % numPartitions();
			}
		},new SortObj());
		
		System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());

		rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {

			@Override
			public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
				while(v2.hasNext()){
					System.out.println("partitionId:" + v1 + "value:" + v2.next());
				}
				return v2;
			}
		}, true).count();
		
		sc.stop();
		
	}
}

public class SortObj implements Comparator<Integer>,Serializable {

	private static final long serialVersionUID = 1L;

	@Override
	public int compare(Integer v1, Integer v2) {
		 
		return v2 - v1;
	}

}


RepartitionBy

        根据传入的自定义分区器分区

参数:  partitionBy(partitioner: Partitioner): RDD[(K, V)]

源码:

  /**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

例子:

public class PartitionerByOperator {
	 @SuppressWarnings("resource")
	public static void main(String[] args) {
		 
		 SparkConf conf = new SparkConf().setMaster("local").setAppName("PartitionerByOperator");
		 JavaSparkContext sc = new JavaSparkContext(conf);
		 
		 List<Tuple2<Integer, String>> list = new ArrayList<>();
		 list.add(new Tuple2<Integer, String>(1, "bjsxt"));
		 list.add(new Tuple2<Integer, String>(2, "bjsxt"));
		 list.add(new Tuple2<Integer, String>(3, "bjsxt"));
		 list.add(new Tuple2<Integer, String>(4, "bjsxt"));
		 list.add(new Tuple2<Integer, String>(5, "bjsxt"));
		 list.add(new Tuple2<Integer, String>(6, "bjsxt"));
		 
		 JavaPairRDD<Integer, String> nameRDD = sc.parallelizePairs(list);
		 System.out.println("nameRDD.partitions().size():"+nameRDD.partitions().size());
		 
		 JavaPairRDD<Integer, String> partitionByRDD = nameRDD.partitionBy(new Partitioner() {
			 
			@Override
			public int numPartitions() {
				return 2;
			}
			
			@Override
			public int getPartition(Object obj) {
				int i = (int)obj;
				if(i % 2 == 0){
					return 0;
				}else{
					return 1;
				}
			}
		});
		 
		 partitionByRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,String>>, Iterator<Tuple2<Integer,String>>>() {

			@Override
			public Iterator<Tuple2<Integer, String>> call(Integer v1, Iterator<Tuple2<Integer, String>> v2) throws Exception {
				 int partitionId = v1;
				while (v2.hasNext()) {
					Tuple2<Integer, String> next = v2.next();
					int id = next._1;
					String name = next._2;
					System.out.println("partitionId:"+partitionId + "= id:"+id + "|name:" + name);
					
				}
				return Arrays.asList(new Tuple2<Integer, String>(1, "asd")).iterator();
			}
		}, false).count();
		 
		 sc.stop();
		 
	}	
}


distinct

    对数据去重 , transformation算子

    底层实现  mapToPair ——>  reduceByKey ——> map

源码:

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(): RDD[T] = withScope {
    distinct(partitions.length)
  }

  /**
   * Return a new RDD containing the distinct elements in this RDD.
   */
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  }

问题:

    如果使用中报OOM原因: distinct底层实现会用 reduceByKey , 那应该是 应该是 shuffle 时产生内存溢出,按照shuffle优化的方法优化即可


Sample

        对数据抽样

参数: sample(withReplacement: Boolean, fraction: Double, seed: Long): RDD[T]

                    withReplacement: 一个数据是否可以重复出现

                    fraction: 取样结果占源数据的比例, [0, 1]

                    seed : 取样时的种子数   , 即取样规则,默认为一个随机数Utils.random.nextLong,如果想每次抽样的结果都是相同的seed可以设置为常数

源码:

 /**
   * Return a sampled subset of this RDD.
   *
   * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
   * @param fraction expected size of the sample as a fraction of this RDD's size
   *  without replacement: probability that each element is chosen; fraction must be [0, 1]
   *  with replacement: expected number of times each element is chosen; fraction must be >= 0
   * @param seed seed for the random number generator
   */
  def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = withScope {
    require(fraction >= 0.0, "Negative fraction value: " + fraction)
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
  }


take

        对RDD取前 num 行记录 ,Take the first num elements of the RDD

        不会排序,对partition依次取:如果第一个partition的行数不够num,就会取第二个

参数 : take(num: Int): Array[T]

源码

 /**
   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
   * whole RDD instead.
   */
  def take(num: Int): JList[T] = rdd.take(num).toSeq.asJava

/**
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.
   *
   * @note due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.size == 0) {
            numPartsToTry = partsScanned * 4
          } else {
            // the left side of max is >=1 whenever partsScanned >= 2
            numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
          }
        }

        val left = num - buf.size
        val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += numPartsToTry
      }

      buf.toArray
    }
  }

例子

public class TakeOperator {

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("ReduceOperator").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);

		List<Integer> numberList = Arrays.asList(3, 2, 3, 4, 5, 1, 2, 5, 8);
		JavaRDD<Integer> numbers = sc.parallelize(numberList,3);

		// 取前3行记录
		List<Integer> top3Numbers = numbers.take(3);
		for(Integer num:top3Numbers){
			System.out.println(num);
		}
		
		sc.close();
	}
}

First

        取第一行记录 , 如果想对RDD取最大值,可以先对RDD倒序排序,再用 first() 取第一行

        底层 take 实现 

源码:

  /**
   * Return the first element in this RDD.
   */
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }


添加新评论