Join
对两个RDD做连接,Key相同合并,合并对应的两个value值封装在二元组里
底层由 cogroup 实现
参数: join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
类型:
join 内连接
fullOuterJoin 全外连接
leftOuterJoin 做外连接
rightOuterJoin 右外连接
源码:
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope {
join(other, new HashPartitioner(numPartitions))
}
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
例子:
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
/**
* id name
* 1 zhangsan
* 2 lisi
* 3 wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
/**
* id age
* 1 30
* 2 29
* 4 21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
/** *******************************RDD **********************************/
println("*********************************RDD**********************************")
println("\n内关联(inner join)\n")
// 内关联(inner join)
// 只保留两边id相等的部分
/**
* (1,(zhangsan,30))
* (2,(lisi,29))
*/
idName.join(idAge).collect().foreach(println)
println("\n左外关联(left out join)\n")
// 左外关联(left out join)
// 以左边的数据为标准, 左边的数据一律保留
// 右边分三情况:
// 一: 左边的id, 右边有, 则合并数据; (1,(zhangsan,Some(30)))
// 二: 左边的id, 右边没有, 则右边为空; (3,(wangwu,None))
// 三: 右边的id, 左边没有, 则不保留; 右边有id为4的行, 但结果中并未保留
/**
* (1,(zhangsan,Some(30)))
* (2,(lisi,Some(29)))
* (3,(wangwu,None))
*/
idName.leftOuterJoin(idAge).collect().foreach(println)
println("\n右外关联(right outer join)\n")
// 右外关联(right outer join)
// 以右边的数据为标准, 右边的数据一律保留
// 左边分三种情况:
// 一: 右边的id, 左边有, 则合并数据; (1,(Some(zhangsan),30))
// 二: 右边的id, 左边没有, 则左边为空; (4,(None,21))
// 三: 左边的id, 右边没有, 则不保留; 左边有id为3的行, 但结果中并为保留
/**
* (1,(Some(zhangsan),30))
* (2,(Some(lisi),29))
* (4,(None,21))
*/
idName.rightOuterJoin(idAge).collect().foreach(println)
println("\n全外关联(full outer join)\n")
// 全外关联(full outer join)
/**
*
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)
/** *******************************DataFrame **********************************/
val schema1 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("name", DataTypes.StringType, nullable = true)))
val idNameDF = sqlContext.createDataFrame(idName.map(t => Row(t._1, t._2)), schema1)
val schema2 = StructType(Array(StructField("id", DataTypes.IntegerType, nullable = true), StructField("age", DataTypes.IntegerType, nullable = true)))
val idAgeDF = sqlContext.createDataFrame(idAge.map(t => Row(t._1, t._2)), schema2)
println("*********************************DataFrame**********************************")
println("\n内关联(inner join)\n")
// 相当于调用, idNameDF.join(idAgeDF, Seq("id"), "inner").collect().foreach(println)
// 这里只是调用了封装的API
idNameDF.join(idAgeDF, "id").collect().foreach(println)
println("\n左外关联(left out join)\n")
idNameDF.join(idAgeDF, Seq("id"), "left_outer").collect().foreach(println)
println("\n右外关联(right outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "right_outer").collect().foreach(println)
println("\n全外关联(full outer join)\n")
idNameDF.join(idAgeDF, Seq("id"), "outer").collect().foreach(println)
println("\nleft semi join\n")
// left semi join
// 左边的id, 在右边有, 就保留左边的数据; 右边的数据不保留, 只有id的有意义的
/**
* [1,zhangsan]
* [2,lisi]
*/
idNameDF.join(idAgeDF, Seq("id"), "leftsemi").collect().foreach(println)
}
}
cogroup 与 join 区别
当出现相同Key时, join会出现笛卡尔积, 而cogroup的处理方式不同
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object Run {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
/**
* id name
* 1 zhangsan
* 2 lisi
* 3 wangwu
*/
val idName = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
/**
* id age
* 1 30
* 2 29
* 4 21
*/
val idAge = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))
println("\ncogroup\n")
/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(30)))
* (2,(CompactBuffer(lisi),CompactBuffer(29)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
* (4,(CompactBuffer(),CompactBuffer(21)))
*/
idName.cogroup(idAge).collect().foreach(println)
println("\njoin\n")
// fullOuterJoin于cogroup的结果类似, 只是数据结构不一样
/**
* (1,(Some(zhangsan),Some(30)))
* (2,(Some(lisi),Some(29)))
* (3,(Some(wangwu),None))
* (4,(None,Some(21)))
*/
idName.fullOuterJoin(idAge).collect().foreach(println)
/**
* id score
* 1 100
* 2 90
* 2 95
*/
val idScore = sc.parallelize(Array((1, 100), (2, 90), (2, 95)))
println("\ncogroup, 出现相同id时\n")
/**
* (1,(CompactBuffer(zhangsan),CompactBuffer(100)))
* (2,(CompactBuffer(lisi),CompactBuffer(90, 95)))
* (3,(CompactBuffer(wangwu),CompactBuffer()))
*/
idName.cogroup(idScore).collect().foreach(println)
println("\njoin, 出现相同id时\n")
/**
* (1,(Some(zhangsan),Some(100)))
* (2,(Some(lisi),Some(90)))
* (2,(Some(lisi),Some(95)))
* (3,(Some(wangwu),None))
*/
idName.fullOuterJoin(idScore).collect().foreach(println)
}
}
mapPartitions
map:遍历单位为一条记录
mapPartitions:遍历单位为一个分区,常用于 对 数据库批量写 ,避免频繁创建数据库连接
注意点:
每次遍历会把一个partition的数据加载到内存里面去,当一个partition 数据量很大 时,有可能会使节点 内存溢出
这时就要 增加分区数,来减小一个分区的数据量。
参数:
mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
preservesPartitioning : 是否保留父RDD分区信息
源码
/**
* Return a new RDD by applying a function to each partition of this RDD.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
例子:
public class MapPartitionsOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JoinOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 准备一下数据
List<String> names = Arrays.asList("xurunyun","liangyongqi","wangfei");
JavaRDD<String> nameRDD = sc.parallelize(names);
// mapPartitions
// map算子,一次就处理一个partition的一条数据!!!
// mapPartitions算子,一次处理一个partition中所有的数据!!!
// 推荐的使用场景!!!
// 如果你的RDD的数据不是特别多,那么采用MapPartitions算子代替map算子,可以加快处理速度
// 比如说100亿条数据 10分区,你一个partition里面就有10亿条数据,不建议使用mapPartitions,
// 内存溢出
nameRDD.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> iterator) throws Exception {
List<String> list = new ArrayList<>();
System.out.println("创建数据库连接");
while (iterator.hasNext()) {
String string = iterator.next();
System.out.println("拼接sql语句:"+string);
list.add(string);
}
System.out.println("批量插入到数据库中");
return list;
}
}).count();
sc.close();
}
}
mapPartitionWithIndex
与 mapPartitions 相同,只是遍历分区时,同时会传入分区的分区号。
参数:
mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
preservesPartitioning : 是否保留父RDD分区信息
源码:
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
例子:
public class MapPartitonsWithIndexOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("MapPartitonsWithIndexOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 准备一下数据
List<String> names = Arrays.asList("xurunyun", "liangyongqi", "wangfei");
// 集群模式 每台服务器 会有资源 --> CPU CORE 、 Memory
// 一般情况下一台机器里面有一个executor进程来运行spark的task
// 一台机器的CPU CORE核的数量就分配给一个executor来使用
// 并行度设置为多少合适呢?理论上就是 机器的数量N 乘以 每台机器的CPU 核数
// 如果一台机器上面跑了好几个executor进程 并行度理论上等于
// 机器的数量N * 每个executor分配的的CPU 核数 * n个 executor
// 实际上企业里面去设置都是这个理论值的2~3倍!!!
JavaRDD<String> nameRDD = sc.parallelize(names,2);
JavaRDD<String> mapPartitionsWithIndex = nameRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Integer partitionId, Iterator<String> iterator) throws Exception {
System.out.println("partitionId:"+partitionId);
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
return Arrays.asList("aaa").iterator();
}
}, true);
List<String> collect = mapPartitionsWithIndex.collect();
sc.stop();
}
}
foreachPartitions
与 mapPartitions 相同,只是 mapPartitions 返回一个RDD,为transformation算子
foreachPartitions 没有返回值。为action算子。
foreach 算子:一次遍历一条数据
foreachPartitionsWithIndex
同 mapPartitionWithIndex
Coalesce
重分区算子,增加或减少分区
源码
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}
}
参数: coalesce(numPartitions: Int, shuffle: Boolean): JavaRDD[T]
numPartitions: 分区数 , shuffle:是否在分区时产生shuffle
分情况:
如果 numPartitions 大于当前的分区数:
而 shuffle==false,那么分区数 并不会增加。
因为既然不产生shuffle,那么就无法增加分区。
所以要 增加分区数,那么 shuffle==true
如果 numPartitions 小于当前的分区数:
而 shuffle==false ,此时一个partition 的数据只会被分到一个分区,不会到多个,跟窄依赖 一样。
这样才不会产生shuffle
而 shuffle==true,此时一个 partition 的数据就会 打散 到几个分区
使用场景
// coalesce算子,功能是将RDD的partition的数量缩减,减少!!!
// 将一定的数据压缩到更少的partition分区中去
// 使用场景!很多时候在filter算子应用之后会优化一下使用coalesce算子
// filter 算子应用到RDD上面,说白了会应用到RDD对应的里面的每个partition上去
// 数据倾斜,换句话说就是有可能有的partition里面就剩下了一条数据!
// 建议使用coalesce算子,从而让各个partition中的数据都更加的紧凑!!
例子
public class CoalesceOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("CoalesceOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
// 公司原先有6个部门
// 但是呢,可不巧,碰到了公司的裁员!裁员以后呢,有的部门中的人员就没有了!
// 数据倾斜,不同的部门人员不均匀,做一个部门的整合的操作,将不同的部门的员工进行压缩!
List<String> staffList = Arrays.asList("xuruyun1","xuruyun2","xuruyun3"
,"xuruyun4","xuruyun5","xuruyun6"
,"xuruyun7","xuruyun8","xuruyun9"
,"xuruyun10","xuruyun11","xuruyun12"
,"xuruyun13","xuruyun14","xuruyun15"
,"xuruyun16","xuruyun17","xuruyun18"
,"xuruyun19","xuruyun20","xuruyun21"
,"xuruyun22","xuruyun23","xuruyun24");
JavaRDD<String> staffRDD = sc.parallelize(staffList, 6);
JavaRDD<String> staffRDD2 = staffRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Integer index, Iterator<String> iterator)
throws Exception {
List<String> list = new ArrayList<String>();
while(iterator.hasNext()){
String staff = iterator.next();
list.add("部门["+(index)+"]"+staff);
}
return list.iterator();
}
}, true);
for(String staffInfo : staffRDD2.collect()){
System.out.println(staffInfo);
}
JavaRDD<String> staffRDD3 = staffRDD2.coalesce(3,true);
//JavaRDD<String> staffRDD3 = staffRDD2.repartition(3);
System.out.println("staffRDD3.partitions().size():"+staffRDD3.partitions().size());
JavaRDD<String> staffRDD4 = staffRDD3.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator<String> call(Integer index, Iterator<String> iterator)
throws Exception {
List<String> list = new ArrayList<String>();
while(iterator.hasNext()){
String staff = iterator.next();
list.add("部门["+(index)+"]"+staff);
}
return list.iterator();
}
}, true);
for(String staffInfo : staffRDD4.collect()){
System.out.println(staffInfo);
}
}
}
repartition
底层由coalesce实现,重分区并由shuffle。
repartition(numPartitions: Int) = coalesce(numPartitions, shuffle = true)
源码:
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
使用场景:
// 一个很经典的场景,使用Spark SQL从HIVE中查询数据时候,spark SQL会根据HIVE
// 对应的hdfs文件的block的数量决定加载出来的RDD的partition有多少个!
// 这里默认的partition的数量是我们根本无法设置的
// 有些时候,可能它会自动设置的partition的数量过于少了,为了进行优化
// 可以提高并行度,就是对RDD使用repartition算子!
repartitionAndSortWithinPartitions
重分区,并指定分区规则,与分区内排序规则
参数: repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])
partitioner:分区器:数据的分区号产生算法 comp:每个分区内数据排序比较器
源码
/**
* Repartition the RDD according to the given partitioner and, within each resulting partition,
* sort records by their keys.
*
* This is more efficient than calling `repartition` and then sorting within each partition
* because it can push the sorting down into the shuffle machinery.
*/
def repartitionAndSortWithinPartitions(partitioner: Partitioner, comp: Comparator[K])
: JavaPairRDD[K, V] = {
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(
new OrderedRDDFunctions[K, V, (K, V)](rdd).repartitionAndSortWithinPartitions(partitioner))
}
例子:
public class RepartitionAndSortWithinPartitionsOperator {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RepartitionAndSortWithinPartitionsOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
List<Tuple2<Integer,Integer>> list = Arrays.asList(
new Tuple2<Integer,Integer>(2, 3),
new Tuple2<Integer,Integer>(1, 2),
new Tuple2<Integer,Integer>(6, 7),
new Tuple2<Integer,Integer>(3, 4),
new Tuple2<Integer,Integer>(5, 6),
new Tuple2<Integer,Integer>(4, 5)
);
JavaPairRDD<Integer,Integer> rdd = sc.parallelizePairs(list,1);
JavaPairRDD<Integer, Integer> rdd1 = rdd.repartitionAndSortWithinPartitions(new Partitioner() {
/**
* 设置分区数据
* 0 1 2
* @return
*/
@Override
public int numPartitions() {
return 3;
}
@Override
public int getPartition(Object key) {
return Integer.valueOf(key+"") % numPartitions();
}
},new SortObj());
System.out.println("rdd1.partitions().size():" + rdd1.partitions().size());
rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,Integer>>, Iterator<Tuple2<Integer,Integer>>>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {
while(v2.hasNext()){
System.out.println("partitionId:" + v1 + "value:" + v2.next());
}
return v2;
}
}, true).count();
sc.stop();
}
}
public class SortObj implements Comparator<Integer>,Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Integer v1, Integer v2) {
return v2 - v1;
}
}
RepartitionBy
根据传入的自定义分区器分区
参数: partitionBy(partitioner: Partitioner): RDD[(K, V)]
源码:
/**
* Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}
例子:
public class PartitionerByOperator {
@SuppressWarnings("resource")
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("PartitionerByOperator");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> list = new ArrayList<>();
list.add(new Tuple2<Integer, String>(1, "bjsxt"));
list.add(new Tuple2<Integer, String>(2, "bjsxt"));
list.add(new Tuple2<Integer, String>(3, "bjsxt"));
list.add(new Tuple2<Integer, String>(4, "bjsxt"));
list.add(new Tuple2<Integer, String>(5, "bjsxt"));
list.add(new Tuple2<Integer, String>(6, "bjsxt"));
JavaPairRDD<Integer, String> nameRDD = sc.parallelizePairs(list);
System.out.println("nameRDD.partitions().size():"+nameRDD.partitions().size());
JavaPairRDD<Integer, String> partitionByRDD = nameRDD.partitionBy(new Partitioner() {
@Override
public int numPartitions() {
return 2;
}
@Override
public int getPartition(Object obj) {
int i = (int)obj;
if(i % 2 == 0){
return 0;
}else{
return 1;
}
}
});
partitionByRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer,String>>, Iterator<Tuple2<Integer,String>>>() {
@Override
public Iterator<Tuple2<Integer, String>> call(Integer v1, Iterator<Tuple2<Integer, String>> v2) throws Exception {
int partitionId = v1;
while (v2.hasNext()) {
Tuple2<Integer, String> next = v2.next();
int id = next._1;
String name = next._2;
System.out.println("partitionId:"+partitionId + "= id:"+id + "|name:" + name);
}
return Arrays.asList(new Tuple2<Integer, String>(1, "asd")).iterator();
}
}, false).count();
sc.stop();
}
}
distinct
对数据去重 , transformation算子
底层实现 mapToPair ——> reduceByKey ——> map
源码:
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
问题:
如果使用中报OOM原因: distinct底层实现会用 reduceByKey , 那应该是 应该是 shuffle 时产生内存溢出,按照shuffle优化的方法优化即可
Sample
对数据抽样
参数: sample(withReplacement: Boolean, fraction: Double, seed: Long): RDD[T]
withReplacement: 一个数据是否可以重复出现
fraction: 取样结果占源数据的比例, [0, 1]
seed : 取样时的种子数 , 即取样规则,默认为一个随机数Utils.random.nextLong,如果想每次抽样的结果都是相同的seed可以设置为常数
源码:
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be >= 0
* @param seed seed for the random number generator
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
take
对RDD取前 num 行记录 ,Take the first num elements of the RDD
不会排序,对partition依次取:如果第一个partition的行数不够num,就会取第二个
参数 : take(num: Int): Array[T]
源码
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T] = rdd.take(num).toSeq.asJava
/**
* Take the first num elements of the RDD. It works by first scanning one partition, and use the
* results from that partition to estimate the number of additional partitions needed to satisfy
* the limit.
*
* @note due to complications in the internal implementation, this method will raise
* an exception if called on an RDD of `Nothing` or `Null`.
*/
def take(num: Int): Array[T] = withScope {
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}
buf.toArray
}
}
例子
public class TakeOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ReduceOperator").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> numberList = Arrays.asList(3, 2, 3, 4, 5, 1, 2, 5, 8);
JavaRDD<Integer> numbers = sc.parallelize(numberList,3);
// 取前3行记录
List<Integer> top3Numbers = numbers.take(3);
for(Integer num:top3Numbers){
System.out.println(num);
}
sc.close();
}
}
First
取第一行记录 , 如果想对RDD取最大值,可以先对RDD倒序排序,再用 first() 取第一行
底层 take 实现
源码:
/**
* Return the first element in this RDD.
*/
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}