![[转载]kafka入门笔记,第1张 [转载]kafka入门笔记,第1张](/aiimages/%5B%E8%BD%AC%E8%BD%BD%5Dkafka%E5%85%A5%E9%97%A8%E7%AC%94%E8%AE%B0.png)
小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。
精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。
HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。
相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
Kafka 节点的 broker涉及 Topic、Partition 两个重要概念
在 Kafka 架构中,有几个术语:
Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
replicas :partition 的副本,保障 partition 的高可用;
leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:
Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。
上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。
言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。
上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。
Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:
/brokers/topics/[topic]/partitions/[partition]/state
目前,有两个地方会对这个 ZooKeeper 的节点进行维护。
Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。
考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?
类似于木桶原理,水位取决于最低那块短板。
如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。
如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。
当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
参考原文:
再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理
前面重点说了消费组的再平衡,等待同步,稳定状态。还有一种状态是离开状态。
消费者离开消费组的情况:消费者应用程序关闭或者消费者不订阅主题了。此时协调者就不需要再在消费组中管理此消费者了。
离开消费组的方法:1,消费者取消心跳任务。2,发送离开组请求。
协调者在处理离开组请求时,首先移除心跳检测,然后将消费者从消费组元数据中移除。因为要 *** 作消费组元数据,所以需要对消费组元数据进行加锁。
消费组状态是准备再平衡
准备再平衡时,一定有延迟 *** 作对象存在,并且还不能完成。这时如果有消费者选择离开,有可能会使延迟 *** 作对象完成,所以每次处理离开请求都会去尝试完成延迟 *** 作对象。
消费组状态是等待同步
等待同步说明延迟 *** 作已经完成,消费者已经存在于消费组中,并且已经收到了加入组响应。这时主消费者正在分配分区,这时如果有消费者要离开,那么原本分配给这个离开的消费者的分区就没有意义了。所以在等待同步状态下处理离开请求,要改变消费组的状态为准备再平衡。让其他消费者重新发送加入组请求。
消费组状态是稳定
稳定状态下的 *** 作和等待同步一样。同样原本分配给这个离开的消费者的分区必须分给消费组中其他的消费者。所以要改变消费组的状态为准备再平衡。让其他消费者重新发送加入组请求。
在协调者把消费组的状态变为准备再平衡的时候,会创建一个延迟 *** 作对象。这个延迟 *** 作对象会等待消费组中所有的消费者都重新发送加入组请求后才能完成。这里就存在一个问题,如果某一个消费者一直不重新发送加入组请求,那么导致延迟 *** 作对象一直都不会完成,协调者就一直不会发送加入组响应给消费者。所以必须设置一个超时时间,让过了超时时间后,延迟 *** 作对象还不能完成,就强制完成。
延迟的加入组 *** 作对象,会选择消费组中所有消费者会话超时时间的最大值,作为延迟 *** 作的超时时间。在过了超时时间后延迟 *** 作对象会被强制完成。在完成延迟 *** 作时,协调者会找出那些没有在规定时间内重新发送加入组请求的消费者,将它们从消费组中移除。因为在完成延迟加入组 *** 作对象时,会发送加入组响应给消费组中所有的消费者,所以要在事先移除掉超时未发送请求的消费者。
协调者返回加入组响应给消费者后,都会立即完成本次的延迟心跳,并创建下一次延迟心跳(针对消费组中所有消费者都会完成延迟心跳)。延迟心跳是用来对各个消费者的监控,检查消费者是否存活,它的超时时间是消费者的会话超时时间。延迟的心跳 *** 作对象什么时候完成?外部依赖条件是协调者和消费者之间网络通信,不管是协调者处理消费者的各种请求,还是协调者发送给消费者的响应,都会去完成延迟心跳,并创建下一次的延迟心跳。消费组的一次再平衡 *** 作过程中,协调者只会创建一个延迟的加入 *** 作对象,并且会为每一个消费者都保存一个延迟心跳对象。延迟心跳的创建是在协调者发送加入组响应给消费者后,就会为每个消费者创建一个延迟心跳。消费者收到加入组响应后,应该在会话时间内及时发送同步组请求给协调者,因为这个时候在协调者侧已经创建了延迟心跳用来监控消费者,如果没有及时发送,那么协调者就会认为消费者故障,从而让消费者离开消费组(按照离开消费组请求的逻辑处理)。
在处理同步组请求时,有多个地方的调用可以去本次完成延迟心跳和创建下一次的延迟心跳。
1,状态为等待同步,设置消费者元数据的回调方法后调用。针对一个非主消费者。
2,状态为稳定,在发送同步组响应给消费者后调用。针对一个消费者(发送同步请求不及时,但是未超时的非主消费者)。
3,状态为等待同步,收到主消费者的同步组请求,给每个消费者发送同步组响应后调用。针对消费组里面的所有已经发送了同步组请求的消费者。
延迟心跳的尝试完成方法的判断条件是:消费者是否存活。判断消费者是否存活有三种条件,只要满足其中一种,就认为消费者是存活的。
1,消费者元数据中的awaitingJoinCallback回调方法不为空。
2,消费者元数据中的awaitingSyncCallback回调方法不为空。
3,消费者最近的心跳时间加上会话超时时间大于下一次心跳截止时间。
延迟心跳的截止时间是在创建延迟心跳时指定的,延迟心跳的创建是在完成上一次的延迟心跳 *** 作之后创建下一次的延迟心跳。在完成了上一次的延迟心跳后,协调者会计算出下一次延迟心跳的截止时间,并创建新的延迟心跳,延迟心跳创建后,和延迟的加入一样,都会马上尝试去完成这个延迟心跳,但是如果是刚刚创建的延迟心跳就尝试去完成是不会完成的。因为刚刚创建的延迟心跳的截止时间等于最新的时间加上会话超时时间。所以不会完成。
有三个地方会去完成延迟心跳并创建下一次的延迟心跳:
1,协调者返回加入组响应给每个消费者后。
2,协调者处理消费者的同步组请求设置回调方法时。
3,协调者返回同步组响应给每个消费者后。
每次创建新的延迟心跳都会计算最新的截止时间,如果没有在下一次心跳截止时间之前完成延迟心跳并创建下一次的延迟心跳,那么延迟心跳就会超时,对应的消费者就 可能 被协调者从消费组中移除(协调者创建的每一个延迟心跳都和消费者一一对应,只要消费者存活,都对应延迟缓存中的一个延迟心跳)。
为什么说过了超时时间可能被协调者清除喃?因为还有其他的两个条件awaitingJoinCallback和awaitingSyncCallback,只要满足这两个条件其中的一个,就算是超时了也会认为消费者存活。为什么需要这样设计喃?设想一下这个场景,在等待同步状态下,有三个消费者(C1,C2,C3;C3是主消费者)。C1的新延迟心跳的截止时间为10秒,C2的新延迟心跳的截止时间为20秒,C3的新延迟心跳的截止时间为60秒。C1和C2都发送同步组请求设置回调方法:awaitingSyncCallback,完成旧的延迟心跳并创建了新的延迟心跳:C1的新延迟心跳的截止时间为20秒,C2的新延迟心跳的截止时间为40秒,这个时候只会完成C1和C2的延迟心跳,C3的旧延迟心跳还存在截止时间为60秒,C3由于自身原因在分区分配时花费了比较久的时候,在45秒的时候才发送同步组请求,在这个时间点上按理说C1和C2早就应该超时被移除消费组了,如果被移除就是纯粹的误杀。C1和C2其实这个时候正在等到协调者的同步组响应。所以如果awaitingSyncCallback不为空的话,就算是超时了,要认为消费者存活,上面的场景在收到C3主消费者的同步组请求后,返回同步组响应给所有的消费者,这是完成C1,C2,C3的延迟心跳并计算出下一次延迟心跳的截止时间创建新的延迟心跳。
协调者在处理消费者的加入组请求时,会设置awaitingJoinCallback回调方法。但是设置之后不会去调用完成延迟心跳和创建下一次的延迟心跳。这里假如:C1和C2已经在消费组里面了,必定也会有与之对应的延迟心跳,下一次心跳的截止时间时间为:C1:10秒,C2:20秒。新的消费者C3发送了加入组请求,那么C1和C2必须在心跳的截止时间内重新发送加入组请求,C1马上就发送了加入组请求,但是延迟加入 *** 作对象不能完成,所以不会发送加入组响应给客户端就不能去完成延迟心跳,10秒后,C1的延迟心跳就超时了,按理说C1会被协调者移除消费组,但是由于协调者在处理C1的加入组请求是设置了awaitingJoinCallback回调方法,所以C1不会被移除,认为是存活的。C2在15秒的时候发送了加入组请求,延迟加入 *** 作可以完成,返回加入组响应给三个消费者,并更新下次心跳的截止时间为:C1:25秒,C2:35秒,C1:75秒。
当消费组的状态变为稳定后,每个消费者都需要重新发送心跳给协调者。
为什么在处理加入组请求时,不去完成延迟心跳喃?按理说消费者能发送加入组请求,就代表消费者存活?
消费者能发送加入组请求是能代表消费者存活,但是现在协调者的处理再平衡状态的时候,认为消费组是不稳定的,在消费组不稳定的时候去设置心跳没有多大意义(通过前面的分析也知道,消费组在再平衡状态中,多个消费者会发送多次的加入组请求,消费组才会最终稳定)。所以在发送加入组响应给消费者后去完成心跳这个时候消费组中的所有消费者都发送了加入组请求,这样用心跳去管理消费者才有意义。所以在处理加入组请求时,不去完成延迟心跳是个很不错的设计。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现
# flink消费kafka细节
Apache kafka connector提供对Kafka服务的事件流的访问。Flink提供了特殊的Kafka连接器,用于从Kafka主题读写数据。 Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。 为此,Flink不仅仅依赖于Kafka的消费者群体偏移量跟踪,还内部跟踪和检查这些偏移量。
请为您的用例和环境选择一个包(Maven项目ID)和类名。 对于大多数用户来说,FlinkKafkaConsumer08(flink-connector-kafka的一部分)是合适的。
| Maven Dependency | Supported since | Consumer and Producer Class name | Kafka version | Notes |
| :------------------------------ | :-------------- | :------------------------------------------ | :------------ | :----------------------------------------------------------- |
| flink-connector-kafka-08_211 | 100 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 08x | Uses the [SimpleConsumer](>
以上就是关于[转载]kafka入门笔记全部的内容,包括:[转载]kafka入门笔记、绍圣--kafka之消费者(八)、flink消费kafka细节等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)