
[TOC]
我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。
他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。
kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。
包括集群启动在内,有三种情况触发控制器选举:
1、集群启动
2、控制器所在代理发生故障
3、zookeeper心跳感知,控制器与自己的session过期
按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。
假设此集群有三个broker,同时启动。
(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。
(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。
(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。
(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。
经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。
此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及 *** 作的一致性。
由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。
控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。
前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:
图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:
1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。
2、一系列的listener,通过对zookeeper的监听,触发相应的 *** 作,**的框的均为listener
3、分区和副本状态机,管理分区和副本。
4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。
5、分区leader选举器,PartitionLeaderSelector
6、主题删除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。
8、控制器平衡 *** 作的KafkaScheduler,仅在broker作为leader时有效。
Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。
故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。
重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种 *** 作。具体如下,其实和控制器初始化有很大的相似度。
1、注册分区管理的相关监听器
2、注册主题管理的相关监听
3、注册代理变化监听器
4、重新初始化ControllerContext,
5、启动控制器和其他代理之间通信的ControllerChannelManager
6、创建用于删除主题的TopicDeletionManager对象,并启动。
7、启动分区状态机和副本状态机
8、轮询每个主题,添加监听分区变化的PartitionModificationsListener
9、如果设置了分区平衡定时 *** 作,那么创建分区平衡的定时任务,默认300秒检查并执行。
除了这些组件的启动外,onControllerFailover方法中还做了如下 *** 作:
1、/controller_epoch值+1,并且更新到ControllerContext
2、检查是否出发分区重分配,并做相关 *** 作
3、检查需要将优先副本选为leader,并做相关 *** 作
4、向kafka集群所有代理发送更新元数据的请求。
下面来看leader权限被取消时,调用的方法onControllerResignation
1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。
2、关闭启动的各个组件
3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。
通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的 *** 作 。
有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。
代理上线:
1、新代理启动时向/brokers/ids写数据
2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManageraddBroker(),完成新上线代理网络层初始化
3、调用KafkaControlleronBrokerStartup()处理
35恢复因新代理上线暂停的删除主题 *** 作线程
代理下线:
1、查找下线节点集合
2、轮询下线节点,调用controllerChannelManagerremoveBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求
3、轮询下线节点,调用KafkaControlleronBrokerFailure处理
4、向集群全部存活代理发送updateMetadataRequest请求
顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。
主要的协调器有如下两个:
1、消费者协调器(ConsumerCoordinator)
2、组协调器(GroupCoordinator)
kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。
为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:
消费者协调器,可以看作是消费者做 *** 作的代理类(其实并不是),消费者很多 *** 作通过消费者协调器进行处理。
消费者协调器主要负责如下工作:
1、更新消费者缓存的MetaData
2、向组协调器申请加入组
3、消费者加入组后的相应处理
4、请求离开消费组
5、向组协调器提交偏移量
6、通过心跳,保持组协调器的连接感知。
7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。
8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。
消费者协调器主要依赖的组件和说明见下图:
可以看到这些组件和消费者协调器担负的工作是可以对照上的。
组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:
组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:
这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。
下图展示了消费者启动选取leader、入组的过程。
消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。
消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡 *** 作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。
因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。
下图展示了不提交offset到服务端的问题:
开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。
因此consumer需要定期提交自己消费的offset到服务端,这样在重分区 *** 作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。
由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!
1、自动提交偏移量
设置 enableautocommit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。
这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。
2、手动提交偏移量
设置 enableautocommit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。
这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果
commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。
3 启动服务
31 启动zookeeper
启动zk有两种方式,第一种是使用kafka自己带的一个zk。
bin/zookeeper-server-startsh config/zookeeperproperties&
另一种是使用其它的zookeeper,可以位于本机也可以位于其它地址。这种情况需要修改config下面的sercerproperties里面的zookeeper地址
。例如zookeeperconnect=102024179:2181
32 启动 kafka
bin/kafka-server-startsh config/serverproperties
4创建topic
bin/kafka-topicssh --create --zookeeper 102024179:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic,只有一个副本,一个分区。
通过list命令查看刚刚创建的topic
bin/kafka-topicssh -list -zookeeper 102024179:2181
5启动producer并发送消息启动producer
bin/kafka-console-producersh --broker-list localhost:9092 --topic test
启动之后就可以发送消息了
比如
test
hello boy
按Ctrl+C退出发送消息
6启动consumer
bin/kafka-console-consumersh --zookeeper 102024179:2181 --topic test --from-beginning
启动consumer之后就可以在console中看到producer发送的消息了
可以开启两个终端,一个发送消息,一个接受消息。
如果这样都不行的话,查看zookeeper进程和kafka的topic,一步步排查原因吧。
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照: >
描述主题的配置
bin/kafka-configssh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
设置保留时间
# Deprecated way
bin/kafka-topicssh --zookeeper localhost:2181 --alter --topic test_topic --config retentionms=1000
# Modern way
bin/kafka-configssh --zookeeper localhost:2181 --alter --entity-ty
在使用kafka默认安装的zookeeper启动是的命令是
/opt/kafka/bin/zookeeper-server-startsh /opt/kafka/config/zookeeperproperties
使用我们自己安装的zookeeper启动命令是:
/zkServersh start
启动kafka
/opt/kafka/bin/kafka-server-startsh /opt/kafka/config/serverproperties
创建主题:
/opt/kafka/bin/kafka-topicsh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 1707d
查看主题:
/opt/kafka/bin/kafka-topicssh --list 1707d --zookeeper localhost:2181
服务器端:
/opt/kafka/bin/kafka-console-producersh --broker-list localhost:9092 --topic 1707d
客户端的命令:
/opt/kafka/bin/kafka-console-consumersh --bootstrap-server localhost:9092 --topic 1707d (--from-beginning)
从第一行可以看到这个命令可以修改 topic, client, user 或 broker 的配置。
如果要设置 topic,就需要设置 entity-type 为topics,输入如下命令:
> bin/kafka-configssh --entity-type topics
Command must include exactly one action: --describe, --alter
命令提示需要指定一个 *** 作(不只是上面提示的两个 *** 作),增加--describe试试:
> bin/kafka-configssh --entity-type topics --describe
[root@localhost kafka_211-01021]# bin/kafka-configssh --entity-type topics --describe
Missing required argument "[zookeeper]"
继续增加 --zookeeper:
> bin/kafka-configssh --entity-type topics --describe --zookeeper localhost:2181
Configs for topic '__consumer_offsets' are segmentbytes=104857600,cleanuppolicy=compact,compressiontype=producer
由于没有指定主题名,这里显示了__consumer_offsets的信息。下面指定一个topic试试。
> bin/kafka-configssh --entity-type topics --describe --zookeeper localhost:2181 --entity-name test
Configs for topic 'test' are
此时显示了test主题的信息,这里是空。
因为Kafka完善的命令提示,可以很轻松的通过提示信息来进行下一步 *** 作,运用熟练后,基本上很快就能实现自己想要的命令。
pe topics --entity-name test_topic --add-config retentionms=1000
如果您需要删除主题中的所有消息,则可以利用保留时间。首先将保留时间设置为非常低(1000 ms),等待几秒钟,然后将保留时间恢复为上一个值。
注意:默认保留时间为24小时(86400000毫秒)。
删除主题
bin/kafka-topicssh --zookeeper localhost:2181 --delete --topic test_topic
注意:需要在Broker的配置文件serverproperties中配置 deletetopicenable=true 才能删除主题。
主题信息
bin/kafka-topicssh --describe --zookeeper localhost:2181 --topic test_topic
添加分区
bin/kafka-topicssh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3
创建主题
bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出主题
bin/kafka-topicssh --list --zookeeper localhost:2181
关于集群方面,我总结的几个点就在于,不管是我们开启的那个服务,数据都是可以联想的,yes,完毕
在章节二( >
1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Producer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-copy
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉
前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# /bin/kafka-server-startsh config/serverproperties
2、查看topic列表:
]# /bin/kafka-topicssh --list --zookeeper localhost:2181
3、创建topic:
]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# /bin/kafka-topicssh --describe --zookeeper localhost:2181 --topic newyear_test
5、producer发送数据:
]# /bin/kafka-console-producersh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# /bin/kafka-topicssh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的brokerid一定设置不同
分别在slave1和slave2上开启进程:
/bin/kafka-server-startsh config/serverproperties
创建topic:
]# /bin/kafka-topicssh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafkapy
执行一次producer:
]# python producer_kafkapy
2、指定partition发送数据
]# python producer_kafka_2py
3、指定partition读出数据
]# python consumer_kafka_2py
consumer_kafkapy:
producer_kafkapy:
consumer_kafka_2py:
producer_kafka_2py:
1新建/conf/kafka_test/flume_kafkaconf
2启动flume:
]# flume-ng agent -c conf -f /conf/kafka_test/flume_kafkaconf -n a1 -Dflumerootlogger=INFO,console
启动成功,如下图:
3测试:
11flume监控产生的数据:
]# for i in seq 1 100 ; do echo '====> '$i >> 1log ; done
12kafka消费数据:
]# /bin/kafka-console-consumersh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:
小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。
精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。
HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。
相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
Kafka 节点的 broker涉及 Topic、Partition 两个重要概念
在 Kafka 架构中,有几个术语:
Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
replicas :partition 的副本,保障 partition 的高可用;
leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:
Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。
上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。
言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。
上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。
Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:
/brokers/topics/[topic]/partitions/[partition]/state
目前,有两个地方会对这个 ZooKeeper 的节点进行维护。
Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。
考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?
类似于木桶原理,水位取决于最低那块短板。
如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。
如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。
当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
参考原文:
再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理
以上就是关于Kafka核心组件之控制器和协调器全部的内容,包括:Kafka核心组件之控制器和协调器、kafka消费者java版本读取不到消息怎么办、Kafka相关内容总结(Kafka集群搭建手记)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)