kafka获取数据的几种方式

kafka获取数据的几种方式,第1张

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.4.1</version></dependency>

2、scala代码

val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" ->"zookeeper1:2181","group.id" ->"spark-streaming-test","zookeeper.connection.timeout.ms" ->"1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic ->1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.unifiedStream.repartition(sparkProcessingParallelism)}

需要注意的要点

1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。

2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。

3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union *** 作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

scala连接代码

val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" ->brokers, "serializer.class" ->"kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line =>{Some(line.toString())})

三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。

基本概念

名称 解释

Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群

Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic

Producer消息生产者,向Broker发送消息的客户端

Consumer消息消费者,从Broker读取消息的客户端

ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息

Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的

Consumer Group

consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程

group.id是一个字符串,唯一标识一个consumer group

consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

Rebalance

触发Rebalance的条件

组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了-这两者的区别后面会谈到)

订阅主题数发生变更-这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance

订阅主题的分区数发生变更

均衡策略:

range和round-robin

Coordinator

确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:

__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。该分区leader所在的broker就是被选定的coordinator

Coordinator同Consumer Group之间的协议:

Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着

LeaveGroup请求:主动告诉coordinator我要离开consumer group

SyncGroup请求:group leader把分配方案告诉组内所有成员

JoinGroup请求:成员请求加入组

DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Kafka Streaming

Stream Processing Topology

1、stream是Kafka Stream最重要的抽象,它代表了一个无限持续的数据集。stream是有序的、可重放消息、对不可变数据集支持故障转移

2、一个stream processing application由一到多个processor topologies组成,其中每个processor topology是一张图,由多个streams(edges)连接着多个stream processor(node)

3、一个stream processor是processor topology中的一个节点,它代表一个在stream中的处理步骤:从上游processors接受数据、进行一些处理、最后发送一到多条数据到下游processors

Kafka Stream提供两种开发stream processing topology的API

1、high-level Stream DSL:提供通用的数据 *** 作,如map和fileter

2、lower-level Processor API:提供定义和连接自定义processor,同时跟state store(下文会介绍)交互

Kafka-Streams的一些亮点:

Source Processor:它是一种特殊的stream processor,没有上游processor。它通过消费一个或者多个topic中的记录为自身的拓扑生成输入流,然后将他们转发到下游processor

Sink Processors:也是一种特殊的stream processor,他没有下游processor。它发送从上游processor接收到的所有记录到一个特定的Kakfa Topic

Time

Event Time:

Processiong Time:

Ingestion Time:

Kafka Stream基于TimestampExtractor接口获取对应data记录的时间戳信息;

State

无状态是指消息的处理不依赖与其他消息,而有状态则相反;比如join/groupby *** 作均为有状态;

Kafka Streams提供State Store机制;

源码分析

运行架构

TopologyBuilder builder = new TopologyBuilder()

builder.addSource("Source", "streams-file-input")

builder.addProcessor("Process", new MyProcessorSupplier(), "Source")

builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process")

builder.addSink("Sink", "streams-wordcount-processor-output", "Process")

KafkaStreams streams = new KafkaStreams(builder, props)

streams.start()

// usually the stream application would be running forever,

// in this example we just let it run for some time and stop since the input data is finite.

Thread.sleep(5000L)

streams.close()

TopologyBuilder: 构建Kafka Topology关系,主要方法有addSource()/addProcessor()/addStateStore()/addSink()

KafkaStreams: 为Kafka Streams运行上下文,开启关闭整个应用;

KafkaStreams启动流程:

GlobalStreamThread

StreamThread

疑问: Topic+Partition ->Thread ->Task之间的关系是什么?如何进行调度的?

TopologyBuilder构建Topology过程

public synchronized ProcessorTopology buildGlobalStateTopology() {

final Set<String>globalGroups = globalNodeGroups()

if (globalGroups.isEmpty()) {

return null

}

return build(globalGroups)

}

名称 类型 含义

nodeToSourceTopics HashMap

运维

惊群与脑裂

这两种现象存在于>=0.9的版本中,当时consumer是依赖zookeeper的。

惊群: 任何Broker和Consumer的增减都会触发所有Consumer的Rebalance,导致同一个group里不同的consumer拥有了相同的partition,进而引起Consumer的消费错乱;

脑裂: 每个consumer单独通过zookeeper判断哪些broker和consumer宕机,不同的consumer在同一个时刻看到的view可能不一致,导致Rebanlance

计算

参考:


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/sjk/6830460.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-03-28
下一篇2023-03-28

发表评论

登录后才能评论

评论列表(0条)

    保存