linux 怎样查看kafka的某topic数据?

linux 怎样查看kafka的某topic数据?,第1张

基于0.8.0版本。

##查看topic分布情况kafka-list-topic.sh

bin/kafka-list-topic.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

(列出所有topic的分区情况)

bin/kafka-list-topic.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--topic

test

(查看test的分区情况)

其实kafka-list-topic.sh里面就一句

exec

$(dirname

$0)/kafka-run-class.sh

kafka.admin.listtopiccommand

$@

实际是通过

kafka-run-class.sh脚本执行的包kafka.admin下面的类

##创建topic

kafka-create-topic.sh

bin/kafka-create-topic.sh

--replica

2

--partition

8

--topic

test

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

创建名为test的topic,

8个分区分别存放数据,数据备份总共2份

bin/kafka-create-topic.sh

--replica

1

--partition

1

--topic

test2

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

结果

topic:

test2

partition:

0

leader:

170

replicas:

170

isr:

170

##重新分配分区kafka-reassign-partitions.sh

这个命令可以分区指定到想要的--broker-list上

bin/kafka-reassign-partitions.sh

--topics-to-move-json-file

topics-to-move.json

--broker-list

"171"

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--execute

cat

topic-to-move.json

{"topics":

[{"topic":

"test2"}],

"version":1

}

##为topic增加

partition数目kafka-add-partitions.sh

bin/kafka-add-partitions.sh

--topic

test

--partition

2

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

(为topic

test增加2个分区)

##控制台接收消息

bin/kafka-console-consumer.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--from-beginning

--topic

test

##控制台发送消息

bin/kafka-console-producer.sh

--broker-list

192.168.197.170:9092,192.168.197.171:

9092

--topic

test

##手动均衡topic,

kafka-preferred-replica-election.sh

bin/kafka-preferred-replica-election.sh

--zookeeper

192.168.197.170:2181,192.168.197.171:2181

--path-to-json-file

preferred-click.json

cat

preferred-click.json

{

"partitions":

[

{"topic":

"click",

"partition":

0},

{"topic":

"click",

"partition":

1},

{"topic":

"click",

"partition":

2},

{"topic":

"click",

"partition":

3},

{"topic":

"click",

"partition":

4},

{"topic":

"click",

"partition":

5},

{"topic":

"click",

"partition":

6},

{"topic":

"click",

"partition":

7},

{"topic":

"play",

"partition":

0},

{"topic":

"play",

"partition":

1},

{"topic":

"play",

"partition":

2},

{"topic":

"play",

"partition":

3},

{"topic":

"play",

"partition":

4},

{"topic":

"play",

"partition":

5},

{"topic":

"play",

"partition":

6},

{"topic":

"play",

"partition":

7}

]

}

##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除

bin/kafka-run-class.sh

kafka.admin.deletetopiccommand

--topic

test666

--zookeeper

192.168.197.170:2181

,192.168.197.171:2181

输入以下代码即可查看kafka状态:

接上图:

BROKER_HOST是kafka server的ip地址,PORTt是server的监听端口。多个host port之间用逗号隔开。

第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略。

第二条命令是查看具体的消费者group的详情信息,需要给出group的名称。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/sjk/9409609.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-28
下一篇2023-04-28

发表评论

登录后才能评论

评论列表(0条)

    保存