Kafka_基础知识

By timebusker on January 17, 2018

image

Apache Kafka 概述

Kafka 是一个高吞吐量、分布式的发布—订阅消息系统。据 Kafka 官方网站介绍,当前的 Kafka 已经定位为一个分布式流式处理平台(a distributed streaming platform), 它最初由 LinkedIn 公司开发,后来成为 Apache 项目的一部分。Kafka 核心模块使用 Scala 语言开发,支持多语言 (如 Java、C/C++、Python、Go、Erlang、Node.js 等)客户端,它以可水平扩展和具有高吞吐量等特性而被广泛使用。 目前越来越多的开源分布式处理系统(如 Flume、Apache Storm、Spark、Flink 等)支持与 Kafka 集成。

Kafka 是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于 ZooKeeper 协调管理的分布式流平台的功能强大的消息系统。 与传统的消息系统相比,Kafka 能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。 image

Kafka 设计动机

Kafka 的设计初衷是使 Kafka 能够成为统一、实时处理大规模数据的平台。为了达到这个目标,Kafka 必须支持以下几个应用场景。

(1)具有高吞吐量来支持诸如实时的日志集这样的大规模事件流

(2)能够很好地处理大量积压的数据,以便能够周期性地加载离线数据进行处理

(3)能够低延迟地处理传统消息应用场景

(4)能够支持分区、分布式,实时地处理消息,同时具有容错保障机制

Kafka 特性

消息持久化

Kafka 高度依赖于文件系统来存储和缓存消息。说到文件系统,大家普遍认为磁盘读写慢,依赖于文件系统进行存储和缓存消息势必在性能上会大打折扣, 其实文件系统存储速度快慢一定程度上也取决于我们对磁盘的用法。据 Kafka 官方网站介绍:6块7200r/min SATA RAID-5 阵列的磁盘线性写的速度为600 MB/s, 而随机写的速度为100KB/s,线性写的速度约是随机写的6000多倍。由此看来磁盘的快慢取决于我们是如何去应用磁盘。 加之现代的操作系统提供了预读(read-ahead)延迟写(write-behind)技术,使得磁盘的写速度并不是大家想象的那么慢。 同时,由于 Kafka 是基于 JVM(Java Virtual Machine)的,而 Java 对象内存消耗非常高,且随着 Java 对象的增加 JVM 的垃圾回收也越来越频繁和繁琐, 这些都加大了内存的消耗。鉴于以上因素,使用文件系统和依赖于页缓存(page cache)的存储比维护一个内存的存储或是应用其他结构来存储消息更有优势,因此 Kafka 选择以文件系统来存储数据。

消息系统数据持久化一般采用为每个消费者队列提供一个 B 树或其他通用的随机访问数据结构来维护消息的元数据, B树操作的时间复杂度为O(log n),O(log n)的时间复杂度可以看成是一个常量时间,而且B树可以支持各种各样的事务性和非事务性语义消息的传递。 尽管B树具有这些优点,但这并不适合磁盘操作。目前的磁盘寻道时间一般在10ms以内,对一块磁盘来说,在同一时刻只能有一个磁头来读写磁盘, 这样在并发IO能力上就有问题。同时,对树结构性能的观察结果表明:其性能会随着数据的增长而线性下降。鉴于消息系统本身的作用考虑, 数据的持久化队列可以建立在简单地对文件进行追加的实现方案上。因为是顺序追加,所以Kafka在设计上是采用时间复杂度O(1)的磁盘结构,它提供了常量时间的性能, 即使是存储海量的信息(TB 级)也如此,性能和数据的大小关系也不大,同时 Kafka 将数据持久化到磁盘上,这样只要磁盘空间足够大数据就可以一直追加, 而不会像一般的消息系统在消息被消费后就删除掉,Kafka 提供了相关配置让用户自己决定消息要保存多久,这样为消费者提供了更灵活的处理方式,因此 Kafka 能够在没有性能损失的情况下提供一般消息系统不具备的特性。

正是由于 Kafka 将消息进行持久化,使得 Kafka 在机器重启后,已存储的消息可继续恢复使用。同时 Kafka 能够很好地支持在线或离线处理、与其他存储及流处理框架的集成。

高吞吐量

高吞吐量是 Kafka 设计的主要目标,Kafka 将数据写到磁盘,充分利用磁盘的顺序读写。同时,Kafka 在数据写入及数据同步采用了零拷贝(zero-copy)技术, 采用 sendFile() 函数调用,sendFile() 函数是在两个文件描述符之间直接传递数据,完全在内核中操作,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝, 操作效率极高。Kafka 还支持数据压缩及批量发送,同时 Kafka 将每个主题划分为多个分区,这一系列的优化及实现方法使得 Kafka 具有很高的吞吐量。 经大多数公司对 Kafka 应用的验证,Kafka 支持每秒数百万级别的消息。

扩展性

Kafka 要支持对大规模数据的处理,就必须能够对集群进行扩展,分布式必须是其特性之一,这样就可以将多台廉价的 PC 服务器搭建成一个大规模的消息系统。 Kafka 依赖 ZooKeeper 来对集群进行协调管理,这样使得 Kafka 更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。 同时在机器扩展时无需将整个集群停机,集群能够自动感知,重新进行负责均衡及数据复制。

多客户端支持

Kafka 核心模块用 Scala 语言开发,但 Kafka 支持不同语言开发生产者和消费者客户端应用程序。 0.8.2 之后的版本增加了 Java 版本的客户端实现,0.10 之后的版本已废弃 Scala 语言实现的 Producer 及 Consumer, 默认使用 Java 版本的客户端。Kafka 提供了多种开发语言的接入,如 Java、Scala、C、C++、Python、Go、Erlang、Ruby、Node.js 等, 感兴趣的读者可以详见这里。同时,Kafka 支持多种连接器(Connector)的接入,也提供了 Connector API 供开发者调用。 Kafka 与当前主流的大数据框架都能很好地集成,如 Flume、Hadoop、HBase、Hive、Spark、Storm 等。

Kafka Streams

Kafka 在0.10之后版本中引入 Kafak Streams。Kafka Streams 是一个用 Java 语言实现的用于流处理的 jar 文件。

安全机制

Kafka 支持以下几种安全措施:

  • 通过 SSL 和 SASL(Kerberos),SASL/PLAIN 验证机制支持生产者、消费者与代理连接时的身份认证;
  • 支持代理与 ZooKeeper 连接身份验证;
  • 通信时数据加密;
  • 客户端读、写权限认证;
  • Kafka 支持与外部其他认证授权服务的集成。
数据备份

Kafka 可以为每个主题指定副本数,对数据进行持久化备份,这可以一定程度上防止数据丢失,提高可用性。

轻量级

Kafka 的代理是无状态的,即代理不记录消息是否被消费,消费偏移量的管理交由消费者自己或组协调器来维护。 同时集群本身几乎不需要生产者和消费者的状态信息,这就使得 Kafka 非常轻量级,同时生产者和消费者客户端实现也非常轻量级。

消息压缩

Kafka 支持 Gzip、Snappy、LZ4 这3种压缩方式,通常把多条消息放在一起组成 MessageSet,然后再把 MessageSet 放到一条消息里面去,从而提高压缩比率进而提高吞吐量。

Kafka 应用场景

消息系统或是说消息队列中间件是当前处理大数据一个非常重要的组件,用来解决应用解耦、异步通信、流量控制等问题, 从而构建一个高效、灵活、消息同步和异步传输处理、存储转发、可伸缩和最终一致性的稳定系统。 当前比较流行的消息中间件有 Kafka、RocketMQ、RabbitMQ、ZeroMQ、ActiveMQ、MetaMQ、Redis 等, 这些消息中间件在性能及功能上各有所长。如何选择一个消息中间件取决于我们的业务场景、系统运行环境、开发及运维人员对消息中件间掌握的情况等。 我认为在下面这些场景中,Kafka 是一个不错的选择。

消息系统

Kafka 作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。

应用监控

利用 Kafka 采集应用程序和服务器健康相关的指标,如 CPU 占用率、IO、内存、连接数、TPS、QPS 等,然后将指标信息进行处理, 从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用 Kafka 与 ELK(ElasticSearch、Logstash 和 Kibana)整合构建应用服务监控系统。

网站用户行为追踪

为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到 Kafka 集群上, 通过 Hadoop、Spark 或 Strom 等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。

流处理

需要将已收集的流数据提供给其他流式计算框架进行处理,用 Kafka 收集流数据是一个不错的选择,而且当前版本的 Kafka 提供了 Kafka Streams 支持对流数据的处理。

持久性日志

Kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka 为故障节点数据恢复提供了一种重新同步的机制。 同时,Kafka 很方便与 HDFS 和 Flume 进行整合,这样就方便将 Kafka 采集的数据持久化到其他外部系统。

Apache Kafka 基础

  • Kafka将消息以topic为单位进行归纳。将向Kafka topic发布消息的程序成为Producers,将预订topics并消费消息的程序成为Consumer
  • Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个Broker
  • Producers通过网络将消息发送到Kafka集群,集群向消费者提供消息;客户端和服务端通过TCP协议通信。

image

主题(Topics)

image
Kafka提供的一个抽象概念:topic。一个topic是对一组消息的归纳。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
image
每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。 分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。

在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。 比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。 之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。

消息

消息是 Kafka 通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为 Message; 在由 Java 重新实现的客户端中,每一条消息称为 Record

分区和副本

Kafka 将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。

每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区的总数减1。 每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象, 即分区的副本与日志对象是一一对应的。每个主题对应的分区数可以在 Kafka 启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。

分区使得 Kafka 在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础

Kafka 只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是 Kafka 高吞吐率的一个重要保证。 同时与传统消息系统不同的是,Kafka 并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储(事实上这也是没有必要的), 因此 Kafka 提供两种删除老数据的策略,一是基于消息已存储的时间长度,二是基于分区的大小。

Leader 副本和 Follower 副本

由于Kafka副本的存在,就需要保证一个分区的多个副本之间数据的一致性,Kafka会选择该分区的一个副本作为Leader副本, 而该分区其他副本即为Follower副本,只有Leader副本才负责处理客户端读/写请求Follower副本从Leader副本同步数据。 如果没有Leader副本,那就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性, 假设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。

引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。 Follower副本从Leader副本同步消息,对于n个副本只需n−1条通路即可,这样就使得系统更加简单而高效。 副本FollowerLeader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。

偏移量

任何发布到分区的消息会被直接追加到日志文件(分区目录下以“.log”为文件名后缀的数据文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。 偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka 并没有提供额外索引机制到存储偏移量, 也就是说并不会给偏移量再提供索引。消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费, 消费者已消费的消息对应的偏移量也需要保存。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中, 而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。

日志段

一个日志又被划分为多个日志段(LogSegment),日志段是 Kafka 日志对象分片的最小单位。与日志对象一样,日志段也是一个逻辑概念, 一个日志段对应磁盘上一个具体日志文件和两个索引文件。日志文件是以“.log”为文件名后缀的数据文件,用于保存消息实际数据。 两个索引文件分别以“.index”和“.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。 image
image

代理

在 Kafka 基本体系结构中我们提到了 Kafka 集群。Kafka 集群就是由一个或多个 Kafka 实例构成,我们将每一个 Kafka 实例称为代理(Broker),通常也称代理为 Kafka 服务器(KafkaServer)。 在生产环境中 Kafka 集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。每一个代理都有唯一的标识 id,这个 id 是一个非负整数。 在一个 Kafka 集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的 id,id 值可以选择任意非负整数即可,只要保证它在整个 Kafka 集群中唯一, 这个 id 就是代理的名字,也就是在启动代理时配置的 broker.id 对应的值,因此在本书中有时我们也称为 brokerId。 由于给每个代理分配了不同的 brokerId,这样对代理进行迁移就变得更方便,从而对消费者来说是透明的,不会影响消费者对消息的消费。

生产者

生产者(Producer)负责将消息发送给代理,也就是向 Kafka 代理发送消息的客户端。

消费者和消费组

发布消息通常有两种模式:队列模式(Queue)和发布-订阅模式(publish-subscribe)。队列模式中,Consumers可以同时从服务端读取消息,每个消息只被其中一个Consumer读到;发布-订阅模式中消息被广播到所有的Consumer中。

消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在 Kafka 中每一个消费者都属于一个特定消费组(ConsumerGroup),我们可以为每个消费者指定一个消费组, 以 groupId 代表消费组名称,通过 group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组 test-consumer-group。
image 同时,每个消费者也有一个全局唯一的 id,通过配置项 client.id 指定,如果客户端没有指定消费者的 id,Kafka 会自动为该消费者生成一个全局唯一的 id, 格式为 ${groupId}-${hostName}-${timestamp}-${UUID前8位字符}。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但不同消费组的消费者可同时消费该消息。 消费组是 Kafka 用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。

Consumer可以加入一个Consumer组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。 同一组中的Consumer可以在不同的程序中,也可以在不同的机器上。如果所有的Consumer都在一个组中, 这就成为了传统的队列模式,在各Consumer中实现负载均衡。 如果所有的Consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的Consumer中。 更常见的是,每个topic都有若干数量的Consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。 这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个Consumer。
image
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个。

通过分区的概念,Kafka可以在多个Consumer组并发的情况下提供较好的有序性和负载均衡。 将每个分区分只分发给一个Consumer组,这样一个分区就只被这个组的一个Consumer消费,就可以顺序的消费这个分区的消息。 因为有多个分区,依然可以在多个Consumer组之间进行负载均衡。注意Consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费

Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。 如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

实际上每个Consumer唯一需要维护的数据是消息在日志中的位置(offset/偏移量)。 这个offsetconsumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息, 比如它可以将offset设置成为一个旧的值来重读之前的消息。 以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来”tail”消息而不会对其他正在消费消息的consumer造成影响。
image

分布式

每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。

每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader。 如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader, 同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

ISR

Kafka 在 ZooKeeper 中动态维护了一个 ISR(In-sync Replica),即保存同步的副本列表,该列表中保存的是与 Leader 副本保持消息同步的所有副本对应的代理节点 id。 如果一个 Follower 副本宕机(本书用宕机来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等)或是落后太多, 则该 Follower 副本节点将从ISR列表中移除。

ZooKeeper

Kafka 利用 ZooKeeper 保存相应元数据信息,Kafka 元数据信息包括如代理节点信息、Kafka 集群信息、旧版消费者信息及其消费偏移量信息、 主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。Kafka 在启动或运行过程当中会在 ZooKeeper 上创建相应节点来保存元数据信息, Kafka 通过监听机制在这些节点注册相应监听器来监听节点元数据的变化,从而由 ZooKeeper 负责管理维护 Kafka 集群, 同时通过 ZooKeeper 我们能够很方便地对 Kafka 集群进行水平扩展及数据迁移。