
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
如何进行Kafka数据源连接
1、在maven添加依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.4.1</version></dependency>
2、scala代码
val kafkaStream = {val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"val kafkaParams = Map("zookeeper.connect" ->"zookeeper1:2181","group.id" ->"spark-streaming-test","zookeeper.connection.timeout.ms" ->"1000")val inputTopic = "input-topic"val numPartitionsOfInputTopic = 5val streams = (1 to numPartitionsOfInputTopic) map { _ =>KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic ->1), StorageLevel.MEMORY_ONLY_SER).map(_._2)}val unifiedStream = ssc.union(streams)val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.unifiedStream.repartition(sparkProcessingParallelism)}
需要注意的要点
1、Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
2、可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
3、如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
二、基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union *** 作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
scala连接代码
val topics = Set("teststreaming")val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092" val kafkaParams = Map[String, String]("metadata.broker.list" ->brokers, "serializer.class" ->"kafka.serializer.StringEncoder")// Create a direct stream val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)val events = kafkaStream.flatMap(line =>{Some(line.toString())})
三、总结:两种方式在生产中都有广泛的应用,新api的Direct应该是以后的首选方式。
基本概念名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer消息生产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的
Consumer Group
consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
group.id是一个字符串,唯一标识一个consumer group
consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)
Rebalance
触发Rebalance的条件
组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了-这两者的区别后面会谈到)
订阅主题数发生变更-这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
订阅主题的分区数发生变更
均衡策略:
range和round-robin
Coordinator
确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:
__consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。该分区leader所在的broker就是被选定的coordinator
Coordinator同Consumer Group之间的协议:
Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
LeaveGroup请求:主动告诉coordinator我要离开consumer group
SyncGroup请求:group leader把分配方案告诉组内所有成员
JoinGroup请求:成员请求加入组
DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
Kafka Streaming
Stream Processing Topology
1、stream是Kafka Stream最重要的抽象,它代表了一个无限持续的数据集。stream是有序的、可重放消息、对不可变数据集支持故障转移
2、一个stream processing application由一到多个processor topologies组成,其中每个processor topology是一张图,由多个streams(edges)连接着多个stream processor(node)
3、一个stream processor是processor topology中的一个节点,它代表一个在stream中的处理步骤:从上游processors接受数据、进行一些处理、最后发送一到多条数据到下游processors
Kafka Stream提供两种开发stream processing topology的API
1、high-level Stream DSL:提供通用的数据 *** 作,如map和fileter
2、lower-level Processor API:提供定义和连接自定义processor,同时跟state store(下文会介绍)交互
Kafka-Streams的一些亮点:
Source Processor:它是一种特殊的stream processor,没有上游processor。它通过消费一个或者多个topic中的记录为自身的拓扑生成输入流,然后将他们转发到下游processor
Sink Processors:也是一种特殊的stream processor,他没有下游processor。它发送从上游processor接收到的所有记录到一个特定的Kakfa Topic
Time
Event Time:
Processiong Time:
Ingestion Time:
Kafka Stream基于TimestampExtractor接口获取对应data记录的时间戳信息;
State
无状态是指消息的处理不依赖与其他消息,而有状态则相反;比如join/groupby *** 作均为有状态;
Kafka Streams提供State Store机制;
源码分析
运行架构
TopologyBuilder builder = new TopologyBuilder()
builder.addSource("Source", "streams-file-input")
builder.addProcessor("Process", new MyProcessorSupplier(), "Source")
builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process")
builder.addSink("Sink", "streams-wordcount-processor-output", "Process")
KafkaStreams streams = new KafkaStreams(builder, props)
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(5000L)
streams.close()
TopologyBuilder: 构建Kafka Topology关系,主要方法有addSource()/addProcessor()/addStateStore()/addSink()
KafkaStreams: 为Kafka Streams运行上下文,开启关闭整个应用;
KafkaStreams启动流程:
GlobalStreamThread
StreamThread
疑问: Topic+Partition ->Thread ->Task之间的关系是什么?如何进行调度的?
TopologyBuilder构建Topology过程
public synchronized ProcessorTopology buildGlobalStateTopology() {
final Set<String>globalGroups = globalNodeGroups()
if (globalGroups.isEmpty()) {
return null
}
return build(globalGroups)
}
名称 类型 含义
nodeToSourceTopics HashMap
运维
惊群与脑裂
这两种现象存在于>=0.9的版本中,当时consumer是依赖zookeeper的。
惊群: 任何Broker和Consumer的增减都会触发所有Consumer的Rebalance,导致同一个group里不同的consumer拥有了相同的partition,进而引起Consumer的消费错乱;
脑裂: 每个consumer单独通过zookeeper判断哪些broker和consumer宕机,不同的consumer在同一个时刻看到的view可能不一致,导致Rebanlance
计算
参考:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)