kafka RoundRobinPartitioner发送消息分区不均

kafka RoundRobinPartitioner发送消息分区不均,第1张

kafka RoundRobinPartitioner发送消息分区不均 问题原因

是生产者“doSend”方法导致了这个问题。在“doSend”方法中,当累加器返回带有 abortForNewBatch 标志且值为真值的结果时,doSend 方法再次调用“partition”方法并且之前选择的分区保持未使用。如果主题只有两个分区,则此问题很危险,因为在这种情况下,将只使用一个分区。

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
    
                if (result.abortForNewBatch) {
                    int prevPartition = partition;
                    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                    partition = partition(record, serializedKey, serializedValue, cluster);
                    tp = new TopicPartition(record.topic(), partition);
                    if (log.isTraceEnabled()) {
                        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                    }
                    // producer callback will make sure to call both 'callback' and interceptor callback
                    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                    result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                }
...
解决方法

这个问题将通过使用像这样的自定义循环分区器来解决

package com.summer.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

public class MyRoundRobin implements Partitioner {

    private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>();
    private final ConcurrentMap unusedPartition = new ConcurrentHashMap<>();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (unusedPartition.containsKey(topic))
            return unusedPartition.remove(topic).get();

        return nextPartition(topic, cluster);
    }

    public int nextPartition(String topic, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = counterNextValue(topic);
        List availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int counterNextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map configs) {
    }

    @Override
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        unusedPartition.put(topic, new AtomicInteger(prevPartition));
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5137091.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-11-17
下一篇2022-11-17

发表评论

登录后才能评论

评论列表(0条)

    保存