
是生产者“doSend”方法导致了这个问题。在“doSend”方法中,当累加器返回带有 abortForNewBatch 标志且值为真值的结果时,doSend 方法再次调用“partition”方法并且之前选择的分区保持未使用。如果主题只有两个分区,则此问题很危险,因为在这种情况下,将只使用一个分区。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
...
解决方法
这个问题将通过使用像这样的自定义循环分区器来解决
package com.summer.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class MyRoundRobin implements Partitioner {
private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>();
private final ConcurrentMap unusedPartition = new ConcurrentHashMap<>();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (unusedPartition.containsKey(topic))
return unusedPartition.remove(topic).get();
return nextPartition(topic, cluster);
}
public int nextPartition(String topic, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = counterNextValue(topic);
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int counterNextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
unusedPartition.put(topic, new AtomicInteger(prevPartition));
}
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)