
如何编写一个kafka生产者?
1、pom文件中导入
org.apache.kafka kafka_2.122.1.1 org.projectlombok lombokprovided
2、yml配置文件中加入kafka配置
spring:
kafka:
producer:
bootstrap-servers: ip:prot
batch-size: 16384
retries: 0
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
3、新建一个类
@Component
public class KafkaProducerTest implements Runnable {
private KafkaProducer kafkaProducer;
private final String topic = "test_producer";
@Value("${spring.kafka.producer.bootstrap-servers}")
private String address;
@Value("${spring.kafka.producer.key-serializer}")
private String key;
@Value("${spring.kafka.producer.value-serializer}")
private String value;
private Properties initProperties() {
// zookeeper 配置
Properties props = new Properties();
props.put("bootstrap.servers", address);
// group 代表一个消费组
props.put("group.id", "kafkaProducer");
props.put("session.timeout.ms", "30000");
// 往zookeeper上写offset的频率
props.put("auto.commit.interval.ms", "1000");
// key的反序列化类型
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 设置value的反序列化类型
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public void senMessage(int key, String value) {
kafkaProducer.send(new ProducerRecord(topic, value));
}
//开启线程,往kfaka中插入一百万数据
public void run() {
kafkaProducer = new KafkaProducer(initProperties());
boolean result = true;
int i = 0;
while (result) {
i++;
senMessage(i, "这是第" + i + "条数据。");
if (i > 1000000) {
result = false;
}
}
System.out.println("共插入数量:" + i);
} 欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)