
SpringBoot使用Kafka
引入kafka依赖:
org.apache.kafka kafka-clients2.6.0 org.apache.kafka kafka-streams2.6.0
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class SendMessage{
public static void sendKafkaMessage(String msg) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.kafka_url);//kafka地址,多个地址用逗号分割
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
try {
ProducerRecord record = new ProducerRecord(Constants.topic, msg); //topic
kafkaProducer.send(record);
log.info("消息发送成功: {}", msg);
} finally {
kafkaProducer.close();
}
}
}
消息消费工具类:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class HandlerMessage{
public static void main(String [] args){
KafkaConsumer kafkaConsumer = new KafkaConfigUtil().initKafkaConfig();
while(true){
ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
log.info("Received Message: {}", record.value());
kafkaConsumer.commitAsync();
JSonObject kafkaMsg = JSONObject.parseObject(JSONObject.parse(record.value()).toString());
log.info("jsonObject: {}", kafkaMsg.toString());
}
}
}
}
kafka创建topic:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)