SparkStreaming不断的接受数据,但是会有高峰与低峰,高峰时数据量大的,低峰时数据量小,这样 batch interval 不好设置。
    但是可以通过中间件(像 flume、Kafka)来对数据做一个缓冲,让数据先写到中间件中,再从中间件中读取到SparkStreaming中,
    这样数据的输入速度就比较平滑,适合流失处理

这里采用 Kafka 作为中间件,SparkStreaming 与 Kafka 整合方式有两种:Receiver 与 Direct


Receiver 模式

    使用Kafka高阶API来在Zookeeper中保存消费过的偏移量

    receiver模式原理图



   SparkStreaming程序启动起来后,会创建一个Job,会 Receiver Tasks 分发到Executor中执行,Receiver Tasks会接收Kafka的数据

    数据会被 持久化,默认级别为 MEMORY_AND_DISK_SER_2,这个级别可以修改

    receiver task对接收过来的数据进行分发备份,分发到不同的节点

                每隔 spark.streaming.blockInterval  时间生成 一个文件,分发一次 ; 每隔 batchInterval  时间,标记为 一个Batch(RDD)

                不同的文件分发到不同的节点,所以每个文件相当于这个batch的 一个block

    这个batch的文件分发完成后, receiver task会去 Zookeeper 更新消费偏移量

    然后向 Driver 中的 ReceiverTracker 汇报block文件位置

    然后 Driver 中的 TAGScheduler 会根据 文件位置 按照 数据本地化级别Task 分发到这些节点处理这个 batch 的数据


receiver模式中存在的问题

    当 Driver 进程挂掉之后,Driver分配的 Executor都会被杀死 , 当更新完Zookeeper偏移量后,还未消费,Driver如果挂掉了,那就会一部分数据没有消费。第二:Driver如果挂掉了就会存在找不到数据的问题,相当于丢失数据。

    如何解决这个问题?

        开启 WAL(write ahead log)预写日志机制

                在接受过来数据备份到其他节点的时候,同时 备份到HDFS  上一份(我们可以将将接收来的数据的持久化级别降级到MEMORY_AND_DISK),这样就能保证数据的安全性。

                不过,因为写HDFS比较消耗性能,要在备份完数据之后才能进行更新zookeeper以及汇报位置等,这样会增加job的执行时间,这样对于任务的执行提高了延迟度。

                开启方法:

                            spark.streaming.receiver.writeAheadLog.enable 设置为 true

                            同时指定保存位置 : sc.checkpoint( path )


receiver的并行度设置

        receiver的并行度是由 spark.streaming.blockInterval  来决定的,默认 200ms,

        假设: batchInterval 为5s, 那么每隔 blockInterval 就会产生一个block,每隔 batchInterval 产生一个 Batch(RDD)。

            这里每个 block 就对应 Batch(RDD)的 Partition,这样每 batchInterval 秒产生的这个RDD的 partition 为 25 个,并行度就是25。

            即 并行度(Partition个数) = batchInterval  / blockInterval 

       如果想提高并行度可以减少 blockInterval 的数值,但是最好不要低于50ms。太小会使一个block数据量过少。或者增大 batchInterval 的数值


整合方法:

        通过 KafkaUtils.createStream() 方法创建 JavaPairReceiverInputDStream 对象

                   JavaPairReceiverInputDStream 为 KV格式  RDD,k:offset v:values

        引入包 spark-streaming-kafka_2.10-1.6.0.jar 1.6.0为Spark版本

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
   * @param groupId   The group id for this consumer.
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread.
   * @param storageLevel RDD storage level.
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,  // StreamingContext
      zkQuorum: String,     // ZooKeeper集群信息(接受Kafka数据的时候会从Zookeeper中获得Offset等元数据信息)
      groupId: String,      // Consumer Group
      topics: JMap[String, JInt],  // Map集合 {消费的Topic:并发读取Topic中Partition的线程数,...}
      storageLevel: StorageLevel  // 对接收的数据持久化级别
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

代码:

public class SparkStreamingOnKafkaReceiver {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SparkStreamingOnKafkaReceiver")
                .setMaster("local[2]")
                .set("spark.streaming.receiver.writeAheadLog.enable", "true");
        //	    .set("spark.streaming.concurrentJobs", "10");

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        // writeAheadLog 日志存储位置
        jsc.checkpoint("d://receivedata");

        //JavaReceiverInputDStream lines = FlumeUtils.createStream(jsc,"192.168.5.1", 9999);
        //JavaReceiverInputDStream lines = FlumeUtils.createPollingStream(jsc, "192.168.5.128", 8899);

        /**
         * 1、KafkaUtils.createStream 使用五个参数的方法,设置receiver的存储级别
         * 2、在java里面使用多个receiver,需要将JavaPairReceiverInputDStream转换成javaDstream使用toJavaDstream
         *  val numStreams = 5
         *	val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
         *	val unifiedStream = streamingContext.union(kafkaStreams)
         */
        Map<String, Integer> topicConsumerConcurrency = new HashMap<String, Integer>();
        // Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.
        // 并发读取Topic中Partition的线程数
        topicConsumerConcurrency.put("Abgelababy", 1);

        //第一个参数是StreamingContext
        //第二个参数是ZooKeeper集群信息(接受Kafka数据的时候会从Zookeeper中获得Offset等元数据信息)
        //第三个参数是Consumer Group
        //第四个参数是Map集合 {消费的Topic:并发读取Topic中Partition的线程数,...}
        //第五个参数是持久化级别 如果开启了预写日志,那么memrory_and_disk_2 就不需要_2了
        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jsc,
                "192.168.126.111:2181,192.168.126.112:2181,192.168.126.113:2181", "MyFiestConsumerGroup", topicConsumerConcurrency, StorageLevel.MEMORY_ONLY());
        /**
         *
         * lines是一个kv格式的  k:offset v:values
         */
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {

            @Override
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception {
                return Arrays.asList(tuple._2.split("\t"));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });


        wordsCount.print();
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }
}



Driect 模式

      直接操作Kafka底层的源数据信息

         Driect模式就是将kafka看成 Spark Streaming的底层文件系统,不是被动接收数据,而是主动去取数据。

                    Driect模式也就是没有 receiver task接收数据。

         消费者偏移量也不是用zookeeper来管理,而是 SparkStreaming自己管理 ,默认保存到内存中

         如果设置了checkpoint,会将之保存到checkpoint目录,当然也可以实现用zookeeper来管理。  

并行度

        底层直接读取数据,所以 RDD 的 partition 与 Kafka topic 的partition 对应

            所以并行度与 topic 的partition个数 有关

整合方法:

    通过 KafkaUtils.createStream() ,传入 broker地址 创建

  /**
   * Create an input stream that directly pulls messages from Kafka Brokers
   * without using any receiver. This stream can guarantee that each message
   * from Kafka is included in transformations exactly once (see points below).
   *
   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the [[StreamingContext]]. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.
   *
   * @param jssc JavaStreamingContext object
   * @param keyClass Class of the keys in the Kafka records
   * @param valueClass Class of the values in the Kafka records
   * @param keyDecoderClass Class of the key decoder
   * @param valueDecoderClass Class type of the value decoder
   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
   *   host1:port1,host2:port2 form.
   *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
   *   to determine where the stream starts (defaults to "largest")
   * @param topics Names of the topics to consume
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam KD type of Kafka message key decoder
   * @tparam VD type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
      jssc: JavaStreamingContext, // context
      keyClass: Class[K],  // key类型 偏移量 可为String
      valueClass: Class[V], // 值类型
      keyDecoderClass: Class[KD], // key解码器
      valueDecoderClass: Class[VD], //值解码器
      kafkaParams: JMap[String, String], // 参数 {"meta.broker.list":"地址"}
      topics: JSet[String]  // topic set集合
    ): JavaPairInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    createDirectStream[K, V, KD, VD](
      jssc.ssc,
      Map(kafkaParams.asScala.toSeq: _*),
      Set(topics.asScala.toSeq: _*)
    )
  }

例子:

JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
		
Map<String, String> kafkaParameters = new HashMap<String, String>();
// broker 地址
kafkaParameters.put("metadata.broker.list", "192.168.126.111:9092,192.168.126.112:9092,192.168.126.113:9092");
		
HashSet<String> topics = new HashSet<String>();
// 主题
topics.add("Abgelababy");

JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(jsc,
		String.class, // key类型,偏移量
		String.class, // 值类型
		StringDecoder.class, //解码器
		StringDecoder.class,
		kafkaParameters,
		topics);



添加新评论