
kafka是一个分布式流处理平台,是三大MQ中间件之一。是一种高吞吐量的分布式发布订阅消息系统。
快速认知概念Broker: kafka的服务端程序,可以认为一个mq节点就是一个broker。
Topic: 每条发布到mq的消息都有一个类别,称为topic,主题的意思。
Producer: 生产者,创建消息发送给mq的topic
Consumer: 消费者,消费队列中的消息
Partition: 是Topic的实际存储空间,一个Topic有一个或多个Partition。Partition是一个有序队列
Replication 副本:也就是partition,副本分为leader和follower,learder挂了后,follower会自动升级为leader,只有leader才能和producer和consumer交互
ConsumerGroup:消费者组,同一个消费者组里同时只能有一个消费者能从相同的partition消费消息
MQ模型点对点:所有消费者在同一个组里,每条消息只会被一个消费者消费
发布订阅:比如每个消费者都属于不同组,则kafka消息可以广播到每个消费者
springboot 中对topic的 *** 作
springboot依赖版本
org.springframework.kafka spring-kafka2.7.0
创建和展示topic详情
public class KafkaAdminTest {
public static final String TOPIC_NAME = "default_topic";
public KafkaAdmin kafkaAdmin(){
Map config = new HashMap<>();
//填上自己的IP和端口
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
return new KafkaAdmin(config);
}
@Test
public void createTopic(){
KafkaAdmin kafkaAdmin = kafkaAdmin();
//设置topic参数 名称 partition数量 备份数量(1代表只有leader,没有follower) 备份数
量不能大于集群节点数量,否则报错
NewTopic newTopic = new NewTopic(TOPIC_NAME, 6, (short)1);
kafkaAdmin.createOrModifyTopics(newTopic);
}
@Test
public void describeTopics(){
KafkaAdmin kafkaAdmin = kafkaAdmin();
Map describeTopics = kafkaAdmin.describeTopics(TOPIC_NAME);
Set> entries = describeTopics.entrySet();
entries.stream().forEach((entry)-> System.err.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
}
对于更高级的功能,您可以AdminClient直接使用。KafkaAdmin内部也是使用AdminClient
public class KafkaAdminTest {
private static final String TOPIC_NAME = "default_topic";
public static AdminClient initAdminClient(){
Properties properties = new Properties();
//填上自己的IP和端口
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"ip:port");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数量,副本数量不能大于集群节点数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,6,(short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
try {
//future等待创建,成功则不会有任何报错
createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
//是否查看内部的topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set topics = listTopicsResult.names().get();
for(String name : topics){
System.err.println(name);
}
}
@Test
public void delTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map stringTopicDescriptionMap = describeTopicsResult.all().get();
Set> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
}
@Test
public void incrPartitionTopicTest() throws ExecutionException, InterruptedException {
Map infoMap = new HashMap<>(1);
AdminClient adminClient = initAdminClient();
//分区数量不能比原有的数量小
NewPartitions newPartitions = NewPartitions.increaseTo(8);
infoMap.put(TOPIC_NAME,newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
createPartitionsResult.all().get();
}
}
上述代码对topic的 *** 作,在公司中可能并不需要开发人员 *** 作,下一期会讲解spingboot中生产者和消费者的代码开发。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)