管理Spark Streaming消费Kafka的偏移量

By timebusker on March 12, 2019

spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的offset的方式是通过checkpoint来记录每个批次的状态持久化到HDFS中, 如果机器发生故障,或者程序故障停止,下次启动时候,仍然可以从checkpoint的目录中读取故障时候rdd的状态,便能接着上次处理的数据继续处理, 但checkpoint方式最大的弊端是如果代码升级,新版本的jar不能复用旧版本的序列化状态,导致两个版本不能平滑过渡,结果就是要么丢数据,要么数据重复。

通用的解决办法就是自己写代码管理spark streaming集成kafka时的offset,自己写代码管理offset,其实就是把每批次offset存储到一个外部的存储系统。

但人需要注意的问题:分区调整,手动维护的偏移量也需要跟进调整

Maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>demo</groupId>
    <artifactId>direct-stream-offset-to-zk</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.1.0</spark.version>
        <jedis.version>2.9.0</jedis.version>
        <spark.streaming.version>2.1.0</spark.streaming.version>
        <spark.kafka.version>2.0.2</spark.kafka.version>
        <kafka.version>0.9.0.0</kafka.version>
        <hbase.version>1.3.0</hbase.version>
        <config.version>1.2.1</config.version>
        <guava.version>18.0</guava.version>
        <joad-time.version>2.9.4</joad-time.version>
        <fast.json>1.2.31</fast.json>
        <scala.logging.version>3.1.0</scala.logging.version>
        <logback.version>1.1.7</logback.version>
        <json-simple.version>1.1.1</json-simple.version>
        <elasticsearch.version>2.3.4</elasticsearch.version>
        <zk.client>0.3</zk.client>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>${zk.client}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>${config.version}</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${joad-time.version}</version>
        </dependency>
        <!--json解析框架-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fast.json}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${spark.kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.streaming.version}</version>
        </dependency>
    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.1</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>2.0.2</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Offset管理工具类

import kafka.common.TopicAndPartition
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.HasOffsetRanges

/**
  *
  * 负责kafka偏移量的读取和保存
  *
  * Created by QinDongLiang on 2017/11/28.
  */
object KafkaOffsetManager {


  lazy val log = org.apache.log4j.LogManager.getLogger("KafkaOffsetManage")

  /***读取zk里面的偏移量,如果有就返回对应的分区和偏移量
    * 如果没有就返回None
    * @param zkClient  zk连接的client
    * @param zkOffsetPath   偏移量路径
    * @param topic    topic名字
    * @return  偏移量Map or None
    */
   def readOffsets(zkClient: ZkClient, zkOffsetPath: String, topic: String): Option[Map[TopicAndPartition, Long]] = {
    //(偏移量字符串,zk元数据)
    val (offsetsRangesStrOpt, _) = ZkUtils.readDataMaybeNull(zkClient, zkOffsetPath)//从zk上读取偏移量
    offsetsRangesStrOpt match {
      case Some(offsetsRangesStr) =>
        //这个topic在zk里面最新的分区数量
        val  lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get
        var offsets = offsetsRangesStr.split(",")//按逗号split成数组
          .map(s => s.split(":"))//按冒号拆分每个分区和偏移量
          .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最终的格式
          .toMap //返回一个Map

        //说明有分区扩展了
        if(offsets.size<lastest_partitions.size){
          //得到旧的所有分区序号
          val old_partitions=offsets.keys.map(p=>p.partition).toArray
          //通过做差集得出来多的分区数量数组
          val add_partitions=lastest_partitions.diff(old_partitions)
          if(add_partitions.size>0){
            log.warn("发现kafka新增分区:"+add_partitions.mkString(","))
            add_partitions.foreach(partitionId=>{
              offsets += (TopicAndPartition(topic,partitionId)->0)
              log.warn("新增分区id:"+partitionId+"添加完毕....")
            })
          }
        }else{
          log.warn("没有发现新增的kafka分区:"+lastest_partitions.mkString(","))
        }
        Some(offsets)//将Map返回
      case None =>
        None//如果是null,就返回None
    }
  }

  /****
    * 保存每个批次的rdd的offset到zk中
    * @param zkClient zk连接的client
    * @param zkOffsetPath   偏移量路径
    * @param rdd     每个批次的rdd
    */
  def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
    //转换rdd为Array[OffsetRange]
    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //转换每个OffsetRange为存储到zk时的字符串格式 :  分区序号1:偏移量1,分区序号2:偏移量2,......
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
    log.debug(" 保存的偏移量:  "+offsetsRangesStr)
    //将最终的字符串结果保存到zk里面
    ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
  }

  class Stopwatch {
    private val start = System.currentTimeMillis()
    def get():Long = (System.currentTimeMillis() - start)
  }
}

测试

import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import kafka.api.OffsetRequest
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.spark_project.jetty.server.{Request, Server}
import org.spark_project.jetty.server.handler.{AbstractHandler, ContextHandler}

/**
  * Created by QinDongLiang on 2017/11/28.
  */
object SparkDirectStreaming {


  val log = org.apache.log4j.LogManager.getLogger("SparkDirectStreaming")

  /***
    * 创建StreamingContext
    * @return
    */
  def createStreamingContext():StreamingContext={

    val isLocal=true//是否使用local模式
    val firstReadLastest=true//第一次启动是否从最新的开始消费

    val sparkConf=new SparkConf().setAppName("Direct Kafka Offset to Zookeeper")
    if (isLocal)  sparkConf.setMaster("local[1]") //local模式
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")//优雅的关闭
    sparkConf.set("spark.streaming.backpressure.enabled","true")//激活削峰功能
    sparkConf.set("spark.streaming.backpressure.initialRate","5000")//第一次读取的最大数据值
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition","2000")//每个进程每秒最多从kafka读取的数据条数

    var kafkaParams=Map[String,String]("bootstrap.servers"-> "192.168.10.6:9092,192.168.10.7:9092,192.168.10.8:9092")//创建一个kafkaParams
    if (firstReadLastest)   kafkaParams += ("auto.offset.reset"-> OffsetRequest.LargestTimeString)//从最新的开始消费
    //创建zkClient注意最后一个参数最好是ZKStringSerializer类型的,不然写进去zk里面的偏移量是乱码
    val  zkClient= new ZkClient("192.168.10.6:2181,192.168.10.7:2181,192.168.10.8:2181", 30000, 30000,ZKStringSerializer)
    val zkOffsetPath="/sparkstreaming/20171128"//zk的路径
    val topicsSet="dc_test".split(",").toSet//topic名字

    val ssc=new StreamingContext(sparkConf,Seconds(10))//创建StreamingContext,每隔多少秒一个批次

    val rdds:InputDStream[(String,String)]=createKafkaStream(ssc,kafkaParams,zkClient,zkOffsetPath,topicsSet)

    //开始处理数据
    rdds.foreachRDD( rdd=>{

      if(!rdd.isEmpty()){//只处理有数据的rdd,没有数据的直接跳过

        //迭代分区,里面的代码是运行在executor上面
        rdd.foreachPartition(partitions=>{

          //如果没有使用广播变量,连接资源就在这个地方初始化
          //比如数据库连接,hbase,elasticsearch,solr,等等

          //遍历每一个分区里面的消息
          partitions.foreach(msg=>{
             log.info("读取的数据:"+msg)
            // process(msg)  //处理每条数据
          })
        })
        //更新每个批次的偏移量到zk中,注意这段代码是在driver上执行的
        KafkaOffsetManager.saveOffsets(zkClient,zkOffsetPath,rdd)
      }
    })
    // 返回StreamContext
    ssc
  }


  /****
    * 负责启动守护的jetty服务
    * @param port 对外暴露的端口号
    * @param ssc Stream上下文
    */
  def daemonHttpServer(port:Int,ssc: StreamingContext)={
    val server=new Server(port)
    val context = new ContextHandler();
    context.setContextPath( "/close" );
    context.setHandler( new CloseStreamHandler(ssc) )
    server.setHandler(context)
    server.start()
  }

  /*** 负责接受http请求来优雅的关闭流
    * @param ssc  Stream上下文
    */
  class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler {
    override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={
      log.warn("开始关闭......")
      ssc.stop(true,true)//优雅的关闭
      response.setContentType("text/html; charset=utf-8");
      response.setStatus(HttpServletResponse.SC_OK);
      val out = response.getWriter();
      out.println("close success");
      baseRequest.setHandled(true);
      log.warn("关闭成功.....")
    }
  }


  /***
    * 通过一个消息文件来定时触发是否需要关闭流程序
    * @param ssc StreamingContext
    */
  def stopByMarkFile(ssc:StreamingContext):Unit= {
    val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在
    var isStop = false
    val hdfs_file_path = "/spark/streaming/stop" //判断消息文件是否存在,如果存在就
    while (!isStop) {
      isStop = ssc.awaitTerminationOrTimeout(intervalMills)
      if (!isStop && isExistsMarkFile(hdfs_file_path)) {
        log.warn("2秒后开始关闭sparstreaming程序.....")
        Thread.sleep(2000)
        ssc.stop(true, true)
      }

    }
  }

    /***
      * 判断是否存在mark file
      * @param hdfs_file_path  mark文件的路径
      * @return
      */
    def isExistsMarkFile(hdfs_file_path:String):Boolean={
      val conf = new Configuration()
      val path=new Path(hdfs_file_path)
      val fs =path.getFileSystem(conf);
      fs.exists(path)
    }





  def main(args: Array[String]): Unit = {
    // 创建StreamingContext
    val ssc=createStreamingContext()
    // 开始执行
    ssc.start()
    // 启动接受停止请求的守护进程
	// 方式一通过Http方式优雅的关闭策略
    daemonHttpServer(5555,ssc)
    // 方式二通过扫描HDFS文件来优雅的关闭
    // stopByMarkFile(ssc)
    // 等待任务终止
    ssc.awaitTermination()


  }

  /****
    *
    * @param ssc  StreamingContext
    * @param kafkaParams  配置kafka的参数
    * @param zkClient  zk连接的client
    * @param zkOffsetPath zk里面偏移量的路径
    * @param topics     需要处理的topic
    * @return   InputDStream[(String, String)] 返回输入流
    */
  def createKafkaStream(ssc: StreamingContext,
                        kafkaParams: Map[String, String],
                        zkClient: ZkClient,
                        zkOffsetPath: String,
                        topics: Set[String]): InputDStream[(String, String)]={
    //目前仅支持一个topic的偏移量处理,读取zk里面偏移量字符串
    val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)
	
    val kafkaStream = zkOffsetData match {
      case None =>  //如果从zk里面没有读到偏移量,就说明是系统第一次启动
        log.info("系统第一次启动,没有读取到偏移量,默认就最新的offset开始消费")
        //使用最新的偏移量创建DirectStream
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
      case Some(lastStopOffset) =>
        log.info("从zk中读取到偏移量,从上次的偏移量开始消费数据......")
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        //使用上次停止时候的偏移量创建DirectStream
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)
    }
    kafkaStream//返回创建的kafkaStream
  }
}