
官方文档定义:kafka是一个分布式、可分区、多副本的日志系统。
kafka术语:
对于一个消息系统而言,追踪客户消费了什么,也即消息消费状态是每个消息系统必须提供的关键功能之一。
系统可以提供的几种可能消息传递保障有3种:
那么kafka是怎么解决的呢?
kafka的解决方案:
topic、partition、segment、offset的关系:
那么对于分区中的一个offset例如等于345552怎么去查找相应的message呢?
实际上offset的存储采用了稀疏索引,这样对于稠密索引来说节省了存储空间,但代价是查找费点时间。
小马最近学习了《深入理解kafka 核心设计与实践原理》朱忠华 著 一书,机缘巧合中又看到了这篇文章,觉得整理得很是详细和全面,图文并茂很直观,在此摘录。
精华总结:依靠主题分区来类似分库分表的方式提高性能,用 副本主从 同步+ ISR(偏移量和HW) 来保证消息队列的可靠性,消费者提交 消费位移 来保证消息不丢失和重复消费等,用ZK来处理 服务发现 ,负载均衡,选举,集群管理,消费位移记录(以被推荐记录于kafka主题内)等。
HW之前的消息才能被消费者拉取,理解为都同步备份完了,才算生产者消息提交成功,对消费者可见。这种ISR机制影响了性能但是保证了可靠性,保证消息不丢失。消费位移提交,默认的是自动提交,异常下消息会重复消费会丢失,但可以参数配置手动提交,自行在业务处理完再提交。消费者拉的方式自主获取消费,便于消费者自行控制消费速率。默认分区规则是哈希一致性方式。
相比 Redis消息队列 本身的可靠性就不如,被消费者拉取完就认为消费完了,消息丢失,所以一般需要自行维护ack机制。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器, Kafka也可以轻松支持每秒百万级的写入请求 ,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。 Kafka速度的秘诀在于 ,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。
一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。
Kafka 节点的 broker涉及 Topic、Partition 两个重要概念
在 Kafka 架构中,有几个术语:
Producer :生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;
Broker :Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;
Topic :producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;
Partition :每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;
Consumer :消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;
Consumer group :high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;
replicas :partition 的副本,保障 partition 的高可用;
leader :replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;
follower :replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;
controller :Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;
ZooKeeper :Kafka 通过 ZooKeeper 来存储集群的 meta 信息等,文中将详述。
一个 topic 可以认为是一类消息,每个 topic 将被分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个 long 型的数字,它唯一标记一条消息。 Kafka 机制中,producer push 来的消息是追加(append)到 partition 中的,这是一种顺序写磁盘的机制,效率远高于随机写内存,如下示意图:
Kafka 中 topic 的每个 partition 有一个预写式的日志文件,虽然 partition 可以继续细分为若干个 segment 文件,但是对于上层应用来说,仍然可以将 partition 看成最小的存储单元(一个有多个 segment 文件拼接的 “巨型” 文件),每个 partition 都由一些列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。
上图中有两个新名词:HW 和 LEO。这里先介绍下 LEO,LogEndOffset 的缩写,表示每个 partition 的 log 最后一条 Message 的位置。HW 是 HighWatermark 的缩写,是指 consumer 能够看到的此 partition 的位置,这个涉及到多副本的概念,这里先提及一下,下文再详述。
言归正传,为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。如下图所示,Kafka 集群中有 4 个 broker,某 topic 有 3 个 partition,且复制因子即副本个数也为 3:
如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。
上一节中讲到了同步副本队列 ISR(In-Sync Replicas)。虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsetstopicreplicationfactor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表,follower 从 leader 同步数据有一些延迟(由参数 replicalagtimemaxms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。AR=ISR+OSR。
上面一节还涉及到一个概念,即 HW。HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。
下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。而 Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。
Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:
/brokers/topics/[topic]/partitions/[partition]/state
目前,有两个地方会对这个 ZooKeeper 的节点进行维护。
Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。
考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?
类似于木桶原理,水位取决于最低那块短板。
如上图,某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) *** 作,在宕机时 log 文件之后直接做追加 *** 作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步 *** 作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。
如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。
当 ISR 中的个副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序选举。
在 consumer 对指定消息 partition 的消息进行消费的过程中,需要定时地将 partition 消息的 消费进度 Offset 记录到 ZooKeeper上 ,以便在该 consumer 进行重启或者其它 consumer 重新接管该消息分区的消息消费权后,能够从之前的进度开始继续进行消息消费。Offset 在 ZooKeeper 中由一个专门节点进行记录,其节点路径为:
#节点内容就是Offset的值。/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]
PS:Kafka 已推荐将 consumer 的 Offset 信息保存在 Kafka 内部的 topic 中,即:
__consumer_offsets(/brokers/topics/__consumer_offsets)
并且默认提供了 kafka_consumer_groupssh 脚本供用户查看consumer 信息(命令:sh kafka-consumer-groupssh –bootstrap-server –describe –group )。在当前版本中,offset 存储方式要么存储在本地文件中,要么存储在 broker 端,具体的存储方式取决 offsetstoremethod 的配置,默认是存储在 broker 端。
在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:broker 注册、topic 注册、producer 和 consumer 负载均衡、维护 partition 与 consumer 的关系、记录消息消费的进度以及 consumer 注册等。
参考原文:
再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理
每个kafka broker中配置文件serverproperties默认必须配置的属性如下:
#唯一标识在集群中的ID,要求是正数。
brokerid=0
#服务端口,默认9092
port=9092
#监听地址
hostname=debugo01
# 处理网络请求的最大线程数
numnetworkthreads=2
# 处理磁盘I/O的线程数
numiothreads=8
# 一些后台线程数
backgroundthreads = 4
# 等待IO线程处理的请求队列最大数
queuedmaxrequests = 500
# socket的发送缓冲区(SO_SNDBUF)
socketsendbufferbytes=1048576
# socket的接收缓冲区 (SO_RCVBUF)
socketreceivebufferbytes=1048576
# socket请求的最大字节数。为了防止内存溢出,messagemaxbytes必然要小于
socketrequestmaxbytes = 104857600
# 每个topic的分区个数,更多的partition会产生更多的segment file
numpartitions=2
# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
autocreatetopicsenable =true
# 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。
defaultreplicationfactor =1
# 消息体的最大大小,单位是字节
messagemaxbytes = 1000000
# Zookeeper quorum设置。如果有多个使用逗号分割
zookeeperconnect=debugo01:2181,debugo02,debugo03
# 连接zk的超时时间
zookeeperconnectiontimeoutms=1000000
# ZooKeeper集群中leader和follower之间的同步实际
zookeepersynctimems = 2000
#日志存放目录,多个目录使用逗号分割
logdirs=/var/log/kafka
# 日志清理策略(delete|compact)
logcleanuppolicy = delete
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
logretentionhours=168
# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
#logretentionbytes=1073741824
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
logsegmentbytes=536870912
# 当达到下面时间,会强制新建一个segment
logrollhours = 247
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(logretentionhours或logretentionbytes)
logretentioncheckintervalms=60000
# 是否开启压缩
logcleanerenable=false
# 对于压缩的日志保留的最长时间
logcleanerdeleteretentionms = 1 day
# 对于segment日志的索引文件大小限制
logindexsizemaxbytes = 10 1024 1024
#y索引计算的一个缓冲区,一般不需要设置。
logindexintervalbytes = 4096
# 是否自动平衡broker之间的分配策略
autoleaderrebalanceenable = false
# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leaderimbalanceperbrokerpercentage = 10
# 检查leader是否不平衡的时间间隔
leaderimbalancecheckintervalseconds = 300
# 客户端保留offset信息的最大空间大小
offsetmetadatamaxbytes = 1024
# Consumer端核心的配置是groupid、zookeeperconnect
# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group
groupid
# 消费者的ID,若是没有设置的话,会自增
consumerid
# 一个用于跟踪调查的ID ,最好同groupid相同
clientid = <group_id>
# socket的超时时间,实际的超时时间为maxfetchwait + sockettimeoutms
sockettimeoutms= 30 1000
# socket的接收缓存空间大小
socketreceivebufferbytes=64 1024
#从每个分区fetch的消息大小限制
fetchmessagemaxbytes = 1024 1024
# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset
autocommitenable = true
# 自动提交的时间间隔
autocommitintervalms = 60 1000
# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetchmessagemaxbytes中数值
queuedmaxmessagechunks = 10
# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数
rebalancemaxretries = 4
# 每次reblance的时间间隔
rebalancebackoffms = 2000
# 每次重新选举leader的时间
refreshleaderbackoffms
# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
fetchminbytes = 1
# 若是不满足fetchminbytes时,等待消费端请求的最长等待时间
fetchwaitmaxms = 100
# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限
consumertimeoutms = -1
# 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadatabrokerlist
#消息的确认模式
# 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
# 1:发送消息,并会等待leader 收到确认后,一定的可靠性
# -1:发送消息,等待leader收到确认,并进行复制 *** 作后,才返回,最高的可靠性
requestrequiredacks = 0
# 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时
queuebufferingmaxms = 5000
# 异步模式下缓冲的最大消息数,同上
queuebufferingmaxmessages = 10000
# 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃
queueenqueuetimeoutms = -1
# 异步模式下,每次发送的消息数,当queuebufferingmaxmessages或queuebufferingmaxms满足条件之一时producer会触发发送。
batchnummessages=200
Kafka到底是个啥?用来干嘛的?
官方定义如下:
翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!
实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。
这些中间件,最大的特点主要有两个:
在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。
但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。
随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。
采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。
消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。
应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!
引入消息中间件之后,整个服务开发会变得更加简单,各负其责。
Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。
LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。
在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。
先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !
如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!
简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。
这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。
这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!
这是 kafka 与其他的消息系统最大的不同!
和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。
那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:
与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。
这里我们需要重点了解一个名词: 消费组 !
考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!
但是不同的组,可以消费同一个分区的数据!
你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。
但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。
如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。
因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!
光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。
kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。
zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk
下载zookeeper,并解压文件包
创建数据、日志目录
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存储路径
最后,启动 Zookeeper 服务
到官网 >
# kafka
springkafkabootstrap-servers=101257041:9092,101257035:9092,101257036:9092
#client-id
springkafkaclient-id=group1
生产者参数
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
springkafkaproduceracks=1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
springkafkaproducerbatch-size=16384
# 发生错误后,消息重发的次数。
springkafkaproducerretries=3
# 设置生产者内存缓冲区的大小。
springkafkaproducerbuffer-memory=33554432
springkafkaproducerkey-serializer=orgapachekafkacommonserializationStringSerializer
springkafkaproducervalue-serializer=orgapachekafkacommonserializationStringSerializer
消费者参数
# 自动提交的时间间隔
springkafkaconsumerauto-commit-interval=1000
# offset的消费位置
springkafkaconsumerauto-offset-reset=latest
# 是否自动提交
springkafkaconsumerenable-auto-commit=false
# 最大拉取间隔时间
springkafkaconsumermaxpollintervalms =600000
# 会话超时时间
springkafkaconsumersessiontimeoutms =10000
springkafkaconsumerkey-deserializer=orgapachekafkacommonserializationStringDeserializer
springkafkaconsumervalue-deserializer=orgapachekafkacommonserializationStringDeserializer
# 消费组名称
springkafkaconsumergroupId=dmsdecision
# 最大拉取条数
springkafkaconsumermax-poll-records=30
# 心跳时间
springkafkaconsumerheartbeat-interval=3000
# kafka springkafkapropertiesparsefileContainerFactory_concurrency监听线程数未设置时,本参数生效
springkafkalistenerconcurrency=30
#MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgmentacknowledge()后提交
#MANUAL_IMMEDIATE 手动调用Acknowledgmentacknowledge()后立即提交
#RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
#BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
#TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
#COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
#COUNT_TIME TIME或COUNT 有一个条件满足时提交
# ack_mode为COUNT/COUNT_TIME 时配置
springkafkalistenerack-mode=manual_immediate
# ack_mode为COUNT/COUNT_TIME 时配置
springkafkalistenerack-count=
# ack_mode为/COUNT_TIME 时配置
springkafkalistenerack-time=
# poll拉取数据超时时间
springkafkalistenerpoll-timeout=
以上就是关于kafka基本数据讲解全部的内容,包括:kafka基本数据讲解、[转载]kafka入门笔记、Kafka系列-主要参数详解等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)