Kafka生产者

Kafka生产者,第1张

Kafka生产者

文章目录
  • Kafka生产者
    • 1、消息发送流程
    • 2、异步发送API
  • 分区策略
  • 数据可靠性保证
    • 关于follower同步过程中出现的问题:ISR
    • ack应答级别
    • leader和follower故障处理
    • Exactly Once语义
    • producer事务

Kafka生产者 1、消息发送流程

Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

相关参数:

batch.size:只有数据积累到batch.size之后,sender才会发送数据。达到该大小则发送

linger.ms:如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。若在该时间内未达到batch.size大小,则发送

备注:在实际工作中 linger.ms=0;

2、异步发送API
  1. 导入依赖

    
            
                org.apache.kafka
                kafka-clients
                2.4.1
            
    
    
  2. 不带回调函数代码

    (1)创建生产者配置对象

    (2)添加配置信息

    (3)创建生产者对象

    (4)调用send发送消息

    (5)关闭资源

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    public class CustomProducer {
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建kafka生产者的配置对象
            Properties properties = new Properties();
            // 2. 给kafka配置对象添加配置信息
            properties.put("bootstrap.servers","hadoop102:9092");
            // 批次大小 默认16K
            properties.put("batch.size", 16384);
            // 等待时间
            properties.put("linger.ms", 1);
            // RecordAccumulator缓冲区大小 默认32M
            properties.put("buffer.memory", 33554432);
            // key,value序列化
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 3. 创建kafka生产者对象
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
    
            // 4. 调用send方法,发送消息
            for (int i = 0; i < 10; i++) {
                kafkaProducer.send(new ProducerRecord<>("first","kafka" + i));
            }
            // 5. 关闭资源
            kafkaProducer.close();
        }
    } 
    
  3. 带回调函数的API

    回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是Recordmetadata和Exception,如果Exception为null,说明消息发送成功,如果Exception不为null,说明消息发送失败。

    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    package com.hpu.kafka;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.Recordmetadata;
    
    import java.util.Properties;
    
    
    public class Proceducer1 {
        public static void main(String[] args) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","hadoop102:9092");
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
            // 设置ack
            properties.put("acks", "all");
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
    
            for (int i = 0; i < 10; i++) {
                // 添加回调
                kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i), new Callback() {
                    // 该方法在Producer收到ack时调用,为异步调用
                    @Override
                    public void onCompletion(Recordmetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println(metadata);
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            }
            kafkaProducer.close();
        }
    }
    
  4. 同步发送

    同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。

    package com.hpu.kafka;
    
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    
    public class Proceducer2 {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","hadoop102:9092");
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
    
            for (int i = 0; i < 100; i++) {
                Recordmetadata first = kafkaProducer.send(new ProducerRecord<>("first", "kafka" + i)).get();
                System.out.println(first);
            }
    
            kafkaProducer.close();
        }
    }
    
分区策略

分区的目的:

  • 提高扩展性
  • 提高并发

如何分区?

  • 在ProducerRecord对象中指定partition值,指定分区。
  • 通过key计算hash值与topic的分区数取模计算得到。
  • 若既没有指定也没有key,则采用Sticky Partition随机选择分区直至达到batch.size。之后再随机一个分区使用。
// 指定分区
        for (int i = 0; i < 10; i++) {
        // 指定发送到1号分区
            kafkaProducer.send(new ProducerRecord<>("first",1,"","kafka" + i));
            // 线程睡眠,避免全部发送到一个分区
            Thread.sleep(2);
        }
// 通过key计算分区
        for (int i = 0; i < 10; i++) {
            // 根据key的hash值分配分区
            kafkaProducer.send(new ProducerRecord<>("first","abc","kafka" + i));
            // 提供线程睡眠,避免发送到同一个分区
            Thread.sleep(2);
        }

自定义分区器

package com.hpu.kafka.partition;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;


public class MyPartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        int partition;
        String s = value.toString();
        if (s.contains("zyn")){
            partition = 0;
        } else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map configs) {

    }
}

通过ProducerConfig.PARTITIONER_CLASS_CONFIG指定自定义分区器

package com.hpu.kafka.partition;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;


public class MyProceducer {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.hpu.kafka.partition.MyPartition");

        KafkaProducer kafkaProducer = new KafkaProducer<>(properties);

        for (int i = 0; i < 10; i++) {
            if (i%2==0){
                kafkaProducer.send(new ProducerRecord<>("first", "zyn666"), new Callback() {
                    @Override
                    public void onCompletion(Recordmetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println(metadata);
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            } else {
                kafkaProducer.send(new ProducerRecord<>("first", "zzz777"), new Callback() {
                    @Override
                    public void onCompletion(Recordmetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println(metadata);
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            }

            Thread.sleep(20);
        }
        kafkaProducer.close();
    }
}
数据可靠性保证

​ 为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

方案优点缺点半数以上完成同步,就发送ack延迟低选举新的leader时,容忍n台节点的故障,需要2n+1个副本全部完成同步,才发送ack选举新的leader时,容忍n台节点的故障,需要n+1个副本延迟高

选择第二种方案的原因:

  • 第一种方案需要存储过多副本数,产生冗余;
  • 第二种方案相较与第一种方案延迟高,但kafka内部通讯,延迟影响较小。
关于follower同步过程中出现的问题:ISR

​ 所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步的问题?

​ Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

ack应答级别

对一些数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。

acks为0:partition的leader接收到消息还没有写入磁盘就已经返回ack,当leader故障时有可能丢失数据;

acks为1:partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;

acks为-1(all):partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。这是因为producer未收到ack,将往新的leader重新发送这部分内容,导致重复。

leader和follower故障处理

(1)follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

(2)leader故障

leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。

Exactly Once语义

At Least once(acks为-1)可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most once(acks为0)可以保证数据不重复,但是不能保证数据不丢失。0.11版本的Kafka,引入了幂等性。

幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义

启用幂等性,只需要将Producer的参数中enable.idempotence设置为true即可。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

接下来的Producer事务可以解决以上困扰。

producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5685424.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存