如果一个变量在Driver端定义,再在算子中被修改,然后又想在Driver端取到修改后结果。

            

那上面的例子是做不到的:

        因为该变量在Driver定义,而算子中的代码逻辑是在Executor中执行的,Spark会把这个变量复制一份给Executor,所以Driver中的变量并没有改变,也就得不到修改后的值。

而使用Accumulator可以解决这个问题

        累加器相当于集群规模的变量

def accumulator() = {
    val conf = new SparkConf();
    conf.setMaster("local").setAppName("accumulator")
    val sc = new SparkContext(conf)
    /**
      *  累加器只能在Driver端定义,因为需要SparkContext
      *  累加器在executor端操作(累加)
      *  累加器只能在Driver端读取,不能在executor端读取
      *
      *  sc.accumulator(初始值) 会返回一个累加器
      */
    val count = sc.accumulator(0)
    val list = List(1,2,3,4,5,6,6)
    val rdd1 = sc.parallelize(list)
    var rdd2 = rdd1.filter( x => {
      count += x
      true
    }).count();

    println(count)
    sc.stop()
  }
累加器有三个注意点:

        累加器只能在Driver端定义,因为需要SparkContext

        累加器在executor端修改(累加)

        累加器只能在Driver端读取,不能在executor端读取

自定义累计器:

import com.bjsxt.spark.constant.Constants;
import com.bjsxt.spark.util.StringUtils;
import org.apache.spark.AccumulatorParam;

public class MonitorAndCameraStateAccumulator implements AccumulatorParam<String> {

	@Override
	public String addInPlace(String v1, String v2) {
		return add(v1, v2);
	}

	/**
	 * 初始化的值
	 */
	@Override
	public String zero(String v1) {
		return Constants.FIELD_NORMAL_MONITOR_COUNT+"=0|"
				+ Constants.FIELD_NORMAL_CAMERA_COUNT+"=0|"
						+ Constants.FIELD_ABNORMAL_MONITOR_COUNT+"=0|"
								+ Constants.FIELD_ABNORMAL_CAMERA_COUNT+"=0|"
										+ Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS+"= "; 
	}

	@Override
	public String addAccumulator(String v1, String v2) {
		return add(v1, v2);
	}

	  
	/**
	 * @param v1 连接串,上次累加后的结果
	 * @param v2 本次累加传入的值
	 * @return 更新以后的连接串
	 */
	private String add(String v1, String v2) {
		 if(StringUtils.isEmpty(v1)){
			return v2;
		} 
		String[] valArr = v2.split("\\|");
		for (String string : valArr) {
			String[] fieldAndValArr = string.split("=",2);
			String field = fieldAndValArr[0];
			String value = fieldAndValArr[1];
			String oldVal = StringUtils.getFieldFromConcatString(v1, "\\|", field);
			if(oldVal != null){
				if(Constants.FIELD_ABNORMAL_MONITOR_CAMERA_INFOS.equals(field)){
					v1 = StringUtils.setFieldInConcatString(v1, "\\|", field, oldVal + "~" + value); 
				}else{
					int newVal = Integer.parseInt(oldVal)+Integer.parseInt(value);
					v1 = StringUtils.setFieldInConcatString(v1, "\\|", field, String.valueOf(newVal));  
				}
			}
		}
		return v1;
	}
}


广播变量

        由以上累加器可以知道 算子中是可以使用 Driver端定义的变量的,但是这个有一个问题,因为它会把这个变量给 每个Task  复制一份,如果这个变量小倒还没事,但是这个变量占用内存大的话,那就会对网络和内存都是大的负担,这个可以使用广播变量来解决这个问题:

     

        广播变量只会在 每个Executor 上保存一个变量的副本

        相当于集群规模的常量

广播变量有三个注意点:

    广播变量只能在Driver端定义,因为需要SparkContext

    广播变量可以在Driver端修改

    广播变量只能在Executor读取,不能修改

问题:RDD可以被广播变量广播出去吗?

    首先RDD是不能广播的,因为RDD本身是不存储数据,它存储的只是逻辑。

    如果要广播RDD里的数据,那么可以 先用 collect 算子将数据拉取到 Driver 端,再将数据集合广播出去。

  // 对listRDD中值过滤掉filterRdd中有的
  def broadcast() = {
    val conf = new SparkConf();
    conf.setMaster("local").setAppName("broadcast")
    val sc = new SparkContext(conf)

    val filterRdd = sc.makeRDD(List("张三","李四"))
    val listRDD = sc.makeRDD(List("我是张三!","我是jeff!","我是dc!","我是李四!","我是张三!","我是devin!","我是张三!",
      "我是pj!","我是张三!","我是zxy!","我是xy!","我是张三!","我是李四!","我是lxinr!"))

    // collect 算子拉取到Driver端
    val filterList = sc.broadcast(filterRdd.collect())

    val rdd = listRDD.filter(x => {
        var count = 0
        filterList.value.foreach(filterWord => {
          if (x.contains(filterWord)) {
            count += 1
          }
        })
        if (count == 0) {
          true
        } else {
          false
        }
    })

    rdd.collect().foreach(println)
    sc.stop()
  }


添加新评论