
不过要注意一些注意事项,对于多个partition和多个consumer
1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5 High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
Properties props = new Properties();
propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据
propsput("zookeeperconnect", "localhost:2181");
propsput("groupid", "pv");
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);
KafkaStream<byte[], byte[]> stream = streamsget(0);
ConsumerIterator<byte[], byte[]> it = streamiterator();
while (ithasNext()){
Systemoutprintln("message: " + new String(itnext()message()));
}
if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block
在用high-level的consumer时,两个给力的工具,
1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumerproperties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
底下给出原文中多线程consumer的完整代码
import kafkaconsumerConsumerConfig;
import kafkaconsumerKafkaStream;
import kafkajavaapiconsumerConsumerConnector;
import javautilHashMap;
import javautilList;
import javautilMap;
import javautilProperties;
import javautilconcurrentExecutorService;
import javautilconcurrentExecutors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
thistopic = a_topic;
}
public void shutdown() {
if (consumer != null) consumershutdown();
if (executor != null) executorshutdown();
}
public void run(int a_numThreads) { // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads
//
executor = ExecutorsnewFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
propsput("zookeeperconnect", a_zookeeper);
propsput("groupid", a_groupId);
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = IntegerparseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
examplerun(threads);
try {
Threadsleep(10000);
} catch (InterruptedException ie) {
}
exampleshutdown();
}
}
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,>
首先这种系统一直正在获取跟新数据,可能是因为网络不好的原因,因为如果系统在进行更新处理的时候,是需要连接网络的,如果网络卡顿或者不好的情况下,可能会反应不过来,会一直导致正在获取新数据的过程的,因此才会这样的。
获取最新数据就会显示。如下参考:
1打开电脑,打开mysql数据库,点击数据库,在右上角输入查询,点击新查询下面的zd查询。如图。
2然后可以通过gmt_create从crew_1中输入SELECT,表中的所有记录都将按时间排序,如图所示。
3如果需要获得按时间排序的表中的第一条记录,请输入SELECTfromcrew_1orderbygmt_createdesclimit0,1,如图所示。
4如果您需要获得第五个记录,请输入SELECTfromcrew_1orderbygmt_createdesclimit4,1,如下所示。
5如果需要获取1001记录,只需将limit4,1更改为limit1000,1。如果需要获取n条记录,在查询语句中添加limitn-1,1,如图所示。
6如果需要获取表中的前n条记录,则更改为限制n,如图所示。
以上就是关于kafka consumer重新连接后如何获取当前最新数据全部的内容,包括:kafka consumer重新连接后如何获取当前最新数据、苹果6s 邮箱为何无法自动获取新数据我是这么 *** 作的: 设置—邮件 、通讯录、 日历—获取新数据、miui系统跟新一直正在获取跟新数据等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)