一、概述

   上次写这篇文章文章的时候,Spark还是1.x,kafka还是0.8x版本,转眼间spark到了2.x,kafka也到了2.x,存储offset的方式也发生了改变,笔者根据上篇文章和网上文章,将offset存储到Redis,既保证了并发也保证了数据不丢失,经过测试,有效。

二、使用场景

Spark Streaming实时消费kafka数据的时候,程序停止或者Kafka节点挂掉会导致数据丢失,Spark Streaming也没有设置CheckPoint(据说比较鸡肋,虽然可以保存Direct方式的offset,但是可能会导致频繁写HDFS占用IO),所以每次出现问题的时候,重启程序,而程序的消费方式是Direct,所以在程序down掉的这段时间Kafka上的数据是消费不到的,虽然可以设置offset为smallest,但是会导致重复消费,重新overwrite hive上的数据,但是不允许重复消费的场景就不能这样做。

三、原理阐述

在Spark Streaming中消费 Kafka 数据的时候,有两种方式分别是 :

1.基于 Receiver-based 的 createStream 方法。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。本文对此方式不研究,有兴趣的可以自己实现,个人不喜欢这个方式。KafkaUtils.createStream

2.Direct Approach (No Receivers) 方式的 createDirectStream 方法,但是第二种使用方式中  kafka 的 offset 是保存在 checkpoint 中的,如果程序重启的话,会丢失一部分数据,我使用的是这种方式。KafkaUtils.createDirectStream。本文将用代码说明如何将 kafka 中的 offset 保存到 Redis 中,以及如何从 Redis 中读取已存在的 offset。参数auto.offset.reset为latest的时候程序才会读取redis的offset。

四、实现代码

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._

import scala.collection.JavaConverters._
import scala.util.Try

/**
  * Created by chouyarn of BI on 2018/8/21
  */
object KafkaUtilsRedis {
  /**
    * 根据groupId保存offset
    * @param ranges
    * @param groupId
    */
  def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
    for (o <- ranges) {
      val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}"
      val value = o.untilOffset
      JedisUtil.set(key, value.toString)
    }
  }

  /**
    * 根据topic,groupid获取offset
    * @param topics
    * @param groupId
    * @return
    */
  def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = {
    val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]()

    topics.foreach(topic => {
      val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*")
      if (!keys.isEmpty) {
        keys.asScala.foreach(key => {
          val offset = JedisUtil.get(key)
          val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
          fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong)
        })
      }
    })
    if (fromOffSets.isEmpty) {
      (fromOffSets.toMap, 0)
    } else {
      (fromOffSets.toMap, 1)
    }
  }

  /**
    * 创建InputDStream,如果auto.offset.reset为latest则从redis读取
    * @param ssc
    * @param topic
    * @param kafkaParams
    * @return
    */
  def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String],
                                  kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = {
    var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
    val groupId = kafkaParams.get("group.id").get
    val (fromOffSet, flag) = getOffset(topic, groupId.toString)
    val offsetReset = kafkaParams.get("auto.offset.reset").get
    if (flag == 1 && offsetReset.equals("latest")) {
      kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet))
    } else {
      kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe(topic, kafkaParams))
    }
    kafkaStreams
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(60))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "group.id" -> "binlog.test.rpt_test_1min",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "session.timeout.ms" -> "20000",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )
    val topic = Array("binlog.test.rpt_test", "binlog.test.hbase_test", "binlog.test.offset_test")
    val groupId = "binlog.test.rpt_test_1min"
    val lines = createStreamingContextRedis(ssc, topic, kafkaParams)
    lines.foreachRDD(rdds => {
      if (!rdds.isEmpty()) {
        println("##################:" + rdds.count())
      }
      storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

五、总结

根据不同的groupid来保存不同的offset,支持多个topic

版权声明:本文为ChouYarn原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ChouYarn/p/9512102.html