kafka consumer重新连接后如何获取当前最新数据

kafka consumer重新连接后如何获取当前最新数据,第1张

不过要注意一些注意事项,对于多个partition和多个consumer

1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目

3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

5 High-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置

因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

Properties props = new Properties();

propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据

propsput("zookeeperconnect", "localhost:2181");

propsput("groupid", "pv");

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);

String topic = "page_visits";

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);

KafkaStream<byte[], byte[]> stream = streamsget(0);

ConsumerIterator<byte[], byte[]> it = streamiterator();

while (ithasNext()){

Systemoutprintln("message: " + new String(itnext()message()));

}

if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block

在用high-level的consumer时,两个给力的工具,

1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 21 21 0 none

pv page_visits 1 19 19 0 none

pv page_visits 2 20 20 0 none

关键就是offset,logSize和Lag

这里以前读完了,所以offset=logSize,并且Lag=0

2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits

3个参数,

[earliest | latest],表示将offset置到哪里

consumerproperties ,这里是配置文件的路径

topic,topic名,这里是page_visits

我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 0 21 21 none

pv page_visits 1 0 19 19 none

pv page_visits 2 0 20 20 none

可以看到offset已经被清0,Lag=logSize

底下给出原文中多线程consumer的完整代码

import kafkaconsumerConsumerConfig;

import kafkaconsumerKafkaStream;

import kafkajavaapiconsumerConsumerConnector;

import javautilHashMap;

import javautilList;

import javautilMap;

import javautilProperties;

import javautilconcurrentExecutorService;

import javautilconcurrentExecutors;

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置

createConsumerConfig(a_zookeeper, a_groupId));

thistopic = a_topic;

}

public void shutdown() {

if (consumer != null) consumershutdown();

if (executor != null) executorshutdown();

}

public void run(int a_numThreads) { // 创建并发的consumers

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream

// now launch all the threads

//

executor = ExecutorsnewFixedThreadPool(a_numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread

threadNumber++;

}

}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

Properties props = new Properties();

propsput("zookeeperconnect", a_zookeeper);

propsput("groupid", a_groupId);

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

return new ConsumerConfig(props);

}

public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = IntegerparseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

examplerun(threads);

try {

Threadsleep(10000);

} catch (InterruptedException ie) {

}

exampleshutdown();

}

}

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

参考,

什么时候用这个接口

Read a message multiple times

Consume only a subset of the partitions in a topic in a process

Manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦

所以不是一定要用,最好别用

You must keep track of the offsets in your application to know where you left off consuming

You must figure out which Broker is the lead Broker for a topic and partition

You must handle Broker leader changes

使用SimpleConsumer的步骤:

Find an active Broker and find out which Broker is the leader for your topic and partition

Determine who the replica Brokers are for your topic and partition

Build the request defining what data you are interested in

Fetch the data

Identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition

然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker

再者,自己去写request并fetch数据

最终,还要注意需要识别和处理broker leader的改变

基本概念

名称 解释

Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群

Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic

Producer 消息生产者,向Broker发送消息的客户端

Consumer 消息消费者,从Broker读取消息的客户端

ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息

Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的

Consumer Group

consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程

groupid是一个字符串,唯一标识一个consumer group

consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

Rebalance

触发Rebalance的条件

组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了-这两者的区别后面会谈到)

订阅主题数发生变更-这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance

订阅主题的分区数发生变更

均衡策略:

range和round-robin

Coordinator

确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:

__consumers_offsets partition# = Mathabs(groupIdhashCode() % groupMetadataTopicPartitionCount)

注意:groupMetadataTopicPartitionCount由offsetstopicnumpartitions指定,默认是50个分区。该分区leader所在的broker就是被选定的coordinator

Coordinator同Consumer Group之间的协议:

Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着

LeaveGroup请求:主动告诉coordinator我要离开consumer group

SyncGroup请求:group leader把分配方案告诉组内所有成员

JoinGroup请求:成员请求加入组

DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Kafka Streaming

Stream Processing Topology

1、stream是Kafka Stream最重要的抽象,它代表了一个无限持续的数据集。stream是有序的、可重放消息、对不可变数据集支持故障转移

2、一个stream processing application由一到多个processor topologies组成,其中每个processor topology是一张图,由多个streams(edges)连接着多个stream processor(node)

3、一个stream processor是processor topology中的一个节点,它代表一个在stream中的处理步骤:从上游processors接受数据、进行一些处理、最后发送一到多条数据到下游processors

Kafka Stream提供两种开发stream processing topology的API

1、high-level Stream DSL:提供通用的数据 *** 作,如map和fileter

2、lower-level Processor API:提供定义和连接自定义processor,同时跟state store(下文会介绍)交互

Kafka-Streams的一些亮点:

Source Processor:它是一种特殊的stream processor,没有上游processor。它通过消费一个或者多个topic中的记录为自身的拓扑生成输入流,然后将他们转发到下游processor

Sink Processors:也是一种特殊的stream processor,他没有下游processor。它发送从上游processor接收到的所有记录到一个特定的Kakfa Topic

Time

Event Time:

Processiong Time:

Ingestion Time:

Kafka Stream基于TimestampExtractor接口获取对应data记录的时间戳信息;

State

无状态是指消息的处理不依赖与其他消息,而有状态则相反;比如join/groupby *** 作均为有状态;

Kafka Streams提供State Store机制;

源码分析

运行架构

TopologyBuilder builder = new TopologyBuilder();

builderaddSource("Source", "streams-file-input");

builderaddProcessor("Process", new MyProcessorSupplier(), "Source");

builderaddStateStore(Storescreate("Counts")withStringKeys()withIntegerValues()inMemory()build(), "Process");

builderaddSink("Sink", "streams-wordcount-processor-output", "Process");

KafkaStreams streams = new KafkaStreams(builder, props);

streamsstart();

// usually the stream application would be running forever,

// in this example we just let it run for some time and stop since the input data is finite

Threadsleep(5000L);

streamsclose();

TopologyBuilder: 构建Kafka Topology关系,主要方法有addSource()/addProcessor()/addStateStore()/addSink();

KafkaStreams: 为Kafka Streams运行上下文,开启关闭整个应用;

KafkaStreams启动流程:

GlobalStreamThread

StreamThread

疑问: Topic+Partition -> Thread -> Task之间的关系是什么?如何进行调度的?

TopologyBuilder构建Topology过程

public synchronized ProcessorTopology buildGlobalStateTopology() {

final Set<String> globalGroups = globalNodeGroups();

if (globalGroupsisEmpty()) {

return null;

}

return build(globalGroups);

}

名称 类型 含义

nodeToSourceTopics HashMap

运维

惊群与脑裂

这两种现象存在于>=09的版本中,当时consumer是依赖zookeeper的。

惊群: 任何Broker和Consumer的增减都会触发所有Consumer的Rebalance,导致同一个group里不同的consumer拥有了相同的partition,进而引起Consumer的消费错乱;

脑裂: 每个consumer单独通过zookeeper判断哪些broker和consumer宕机,不同的consumer在同一个时刻看到的view可能不一致,导致Rebanlance;

计算

参考:

flink 中已经预置了 kafka 相关的数据源实现 FlinkKafkaConsumer010 ,先看下具体的实现:

kafka Consumer 有一堆实现,不过最终都是继承自 FlinkKafkaConsumerBase ,而这个抽象类则是继承 RichParallelSourceFunction ,是不是很眼熟,跟自定义 mysql 数据源继承的抽象类 RichSourceFunction 很类似。

可以看到,这里有很多构造函数,我们直接使用即可。

说明:

a、这里直接使用 properties 对象来设置 kafka 相关配置,比如 brokers zk groupId 序列化 反序列化 等。

b、使用 FlinkKafkaConsumer010 构造函数,指定 topic properties 配置

c、 SimpleStringSchema 仅针对 String 类型数据的序列化及反序列化,如果 kafka 中消息的内容不是 String ,则会报错;看下 SimpleStringSchema 的定义:

d、这里直接把获取到的消息打印出来。

以上就是关于kafka consumer重新连接后如何获取当前最新数据全部的内容,包括:kafka consumer重新连接后如何获取当前最新数据、kafkaStream、Flink 配置Kafka数据源等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/9308056.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-27
下一篇2023-04-27

发表评论

登录后才能评论

评论列表(0条)

    保存