
##查看topic分布情况kafka-list-topic.sh
bin/kafka-list-topic.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
(列出所有topic的分区情况)
bin/kafka-list-topic.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--topic
test
(查看test的分区情况)
其实kafka-list-topic.sh里面就一句
exec
$(dirname
$0)/kafka-run-class.sh
kafka.admin.listtopiccommand
$@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类
##创建topic
kafka-create-topic.sh
bin/kafka-create-topic.sh
--replica
2
--partition
8
--topic
test
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
创建名为test的topic,
8个分区分别存放数据,数据备份总共2份
bin/kafka-create-topic.sh
--replica
1
--partition
1
--topic
test2
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
结果
topic:
test2
partition:
0
leader:
170
replicas:
170
isr:
170
##重新分配分区kafka-reassign-partitions.sh
这个命令可以分区指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh
--topics-to-move-json-file
topics-to-move.json
--broker-list
"171"
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--execute
cat
topic-to-move.json
{"topics":
[{"topic":
"test2"}],
"version":1
}
##为topic增加
partition数目kafka-add-partitions.sh
bin/kafka-add-partitions.sh
--topic
test
--partition
2
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
(为topic
test增加2个分区)
##控制台接收消息
bin/kafka-console-consumer.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--from-beginning
--topic
test
##控制台发送消息
bin/kafka-console-producer.sh
--broker-list
192.168.197.170:9092,192.168.197.171:
9092
--topic
test
##手动均衡topic,
kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh
--zookeeper
192.168.197.170:2181,192.168.197.171:2181
--path-to-json-file
preferred-click.json
cat
preferred-click.json
{
"partitions":
[
{"topic":
"click",
"partition":
0},
{"topic":
"click",
"partition":
1},
{"topic":
"click",
"partition":
2},
{"topic":
"click",
"partition":
3},
{"topic":
"click",
"partition":
4},
{"topic":
"click",
"partition":
5},
{"topic":
"click",
"partition":
6},
{"topic":
"click",
"partition":
7},
{"topic":
"play",
"partition":
0},
{"topic":
"play",
"partition":
1},
{"topic":
"play",
"partition":
2},
{"topic":
"play",
"partition":
3},
{"topic":
"play",
"partition":
4},
{"topic":
"play",
"partition":
5},
{"topic":
"play",
"partition":
6},
{"topic":
"play",
"partition":
7}
]
}
##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh
kafka.admin.deletetopiccommand
--topic
test666
--zookeeper
192.168.197.170:2181
,192.168.197.171:2181
输入以下代码即可查看kafka状态:
接上图:
BROKER_HOST是kafka server的ip地址,PORTt是server的监听端口。多个host port之间用逗号隔开。
第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略。
第二条命令是查看具体的消费者group的详情信息,需要给出group的名称。
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。
对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)