
关键参数--generate
在进行分区副本重分配之前,最好是用下面方式获取一个合理的分配文件;
编写move-json-filejson文件; 这个文件就是告知想对哪些topic进行重新分配的计算。
然后执行下面的脚本,--broker-list "0,1,2" 这个参数是你想要分配的Brokers。
执行完毕之后会打印:
需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。
关键参数--execute将上面得到期望的重新分配方式文件保存在一个json文件里面 reassignment-json-filejson
kafka并没有提供一个专门的脚本来支持副本的扩缩, 不像kafka-topicsh脚本一样,是可以扩分区的;
想要对副本进行扩缩,只能是曲线救国,利用kafka-reassign-partitionssh来重新分配副本。
假设我们当前的情况是 3分区1副本,为了提供可用性,我想把副本数升到2;
计算副本分配方式
我们用 --generate 获取一下当前的分配情况,得到如下json
数据迁移、分区副本重分配、跨路径迁移、副本扩缩容
>
一、 K afka的三大组件:Producer、Server、Consumer
1、Kafka的 Producer 写入消息
producer采用push(推)模式将消息发布到broker,每条消息,都被追加到分区中(顺序写到磁盘,比随机写内存效率高)。
· 分区的作用:方便容量扩展,可以多并发读写数据,所以我们会指定多个分区进行数据存储。
· 一般根据 event_key的hash % numPartitions来确定写入哪个分区,如果写入时没有指定key,则轮询写入每个分区;因此导致每个partition中消息是有序的,整体无序。
每条event数据写入partitionA中,并且只会写入partitionA_leader,当partitionA_leader写入完成后partitionA_flower节点再去partitionA_leader上异步拉取数据;默认ack为1,表示不会等待partitionA_flowers写入完成;如果设置ack为副本数或ack=-1,则等待副本全部写完,再写入下一条数据。
2、kafka的 broker—— 保存消息
1、 创建topic,并指定分区和副本数
2、每个分区(partition)有一个leader,多个follower,pull数据时先寻找leader,只会读leader上的数据,leader和follower不会在一个节点上,leader节点宕机后,其中一个follower变成leader
3、 消息数据存在每个分区中,默认配置每条消息保存7天 或 分区达到1GB 后删除数据
3、 K afka的 Consumer 消费数据:
1、consumer采用pull(拉)模式从broker中读取数据。
2、如果一个消费者来消费同一个topic下不同分区的数据,会读完一个分区再读下一个分区
生产者(producer)A PI 只有一套 ; 但是消费者(consumer)A PI 有两套(高级A PI 和低级A PI )
一、高级API:
Zookeeper管理offset(默认从最后一个开始读新数据,可以配置从开头读)
kafka server(kafka服务)管理分区、副本
二、低级API:
开发者自己控制offset,想从哪里读就从哪里读
// SimpleConsumer是Kafka用来读数据的类
// 通过send()方法获取元数据找到leader
TopicMetadataResponse metadataResponse = simpleConsumersend(request); //通过metadataResponse获取topic元数据,在获取topic中每个分区的元数据
// fetch 抓取数据
FetchResponse response = simpleConsumerfetch(fetchRequest);
// 解析抓取到的数据
ByteBufferMessageSet messageAndOffsets = responsemessageSet(topic, partition);
二、数据、broker状态,consumer状态的存储
一、在本地存储原始消息数据:
1、hash取模得分区、kafka中每条消息有一个Key,用来确定 每条数据存储到哪个分区中
2、轮询
3、自定义分区
二、在zookeeper存储kafka的元数据
三、存储consumer的offset数据
每个consumer有一个Key(broker+Topic+partition)的hash,再取模后 用来确定offset存到哪个系统文件中,Value是partitionMetaData。
1、使用zookeeper启动,zookeeper来存储offset
消费者 消费消息时,offset(消费到的下标)会保存在consumer本地和zookeeper中(由本地上传到zookeeper中,所以本地会保存offset)
2、使用bootstrap启动,本地存储offset(在本地可以减少两节点交互),zookeeper存储其他数据
三、某 F lume对接Kafka案例
consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数
据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有
数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。
可以通过分区策略体现消息顺序性。分区策略有轮询策略、随机策略、按消息键保序策略。
处理顺序 :拦截器->序列化器->分区器
消息在通过 send() 方法发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是所要发往的分区号。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。
一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。
当前消费者需要提交的消费位移是offset+1
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件。
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
每个日志分段文件对应了两个索引文件,主要用来提高查找消息的效率。
日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。
日志压缩(Log Compaction):针对每个消息的 key 进行整合,对于有相同 key 的不同 value 值,只保留最后一个版本。
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用 kafka-topicssh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka 中有多种延时 *** 作,比如延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时 *** 作创建之后会被加入延时 *** 作管理器(DelayedOperationPurgatory)来做专门的处理。延时 *** 作有可能会超时,每个延时 *** 作管理器都会配备一个定时器(SystemTimer)来做超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
为了实现生产者的幂等性,Kafka 为此引入了 producer id(以下简称 PID)和序列号(sequence number)这两个概念。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在
初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
Kafka中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子 *** 作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
生产者必须提供唯一的transactionalId,启动后请求事务协调器获取一个PID,transactionalId与PID一一对应。
每次发送数据给<Topic, Partition>前,需要先向事务协调器发送AddPartitionsToTxnRequest,事务协调器会将该<Transaction, Topic, Partition>存于__transaction_state内,并将其状态置为BEGIN。
在处理完 AddOffsetsToTxnRequest 之后,生产者还会发送 TxnOffsetCommitRequest 请求给 GroupCoordinator,从而将本次事务中包含的消费位移信息 offsets 存储到主题 __consumer_offsets 中
一旦上述数据写入 *** 作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。
在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所
有 topic 的分区副本分配和 leader 选举等工作。Controller 的管理工作都是依赖于 Zookeeper 的。
以上就是关于【kafka】kafka数据迁移、分区副本重分配全部的内容,包括:【kafka】kafka数据迁移、分区副本重分配、Spark-从Kafka读取数据、【大数据技术】kafka简介和底层实现等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)