背景

由于项目上Flink在设置parallel多于1的情况下,job没法正确地获取watermark,所以周末来研究一下一部分,大概已经锁定了原因:
虽然我们的topic只设置了1的partition,但是Kafka的Comsumer还是起了好几个subtask去读索引是2、3的partition,然后这几个subtask的watermark一直不更新,导致我们job整体的watermark一直是Long.MIN_VALUE。现在需要去了解一下subtask获取partition的流程,等上班的时候debug一遍应该就可以知道原因。

翻源码的过程

通过log找到分配partition的大概位置

find partition

从图中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase这个类中可以找到一些关键信息。

跟踪源码

log partition

往上翻翻,看有没有有用信息

all partitions

关键源码,附上注释

    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the partition discoverer
        this.partitionDiscoverer = createPartitionDiscoverer(
                topicsDescriptor,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();
        // 重点函数,这个函数或获取到subtask的所有partition。
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
        if (restoredState != null) {
            ...
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
            // when the partition is actually read.
            switch (startupMode) {
                ...
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    ...
                    case GROUP_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info("Consumer subtask {} initially has no partitions to read from.",
                    getRuntimeContext().getIndexOfThisSubtask());
            }
        }

    public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
        if (!closed && !wakeup) {
            try {
                List<KafkaTopicPartition> newDiscoveredPartitions;

                // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
                if (topicsDescriptor.isFixedTopics()) {
                    // 对于没有使用通配符的topic,直接获取topic的所有partition
                    newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
                } else {
                    // 对于使用了通配符的topic, 先找到所有topic,再一一match
                    List<String> matchedTopics = getAllTopics();

                    // retain topics that match the pattern
                    Iterator<String> iter = matchedTopics.iterator();
                    while (iter.hasNext()) {
                        if (!topicsDescriptor.isMatchingTopic(iter.next())) {
                            iter.remove();
                        }
                    }

                    if (matchedTopics.size() != 0) {
                        // get partitions only for matched topics
                        newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
                    } else {
                        newDiscoveredPartitions = null;
                    }
                }

                // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
                if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
                    throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
                } else {
                    Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
                    KafkaTopicPartition nextPartition;
                    while (iter.hasNext()) {
                        nextPartition = iter.next();
                        // 只保留符合要求的partition,这就是我们要找的函数
                        if (!setAndCheckDiscoveredPartition(nextPartition)) {
                            iter.remove();
                        }
                    }
                }

                return newDiscoveredPartitions;
            }...
        }...
    }
    
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
        if (isUndiscoveredPartition(partition)) {
            discoveredPartitions.add(partition);

            // 在这
            return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
        }

        return false;
    }


    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        // 先算出此topic的hash(partition.getTopic().hashCode() * 31),这里不知道为什么不直接用hash,还要再*31,然后取正数(& 0x7FFFFFFF),最后获取到此topic的起始位置。
        int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the start index
        // 计算当前的partition应该属于哪个subtask。例如:一共有20个subtask,算出来的起始位置是5,partition是5,那么最后就是
        // (5 + 5) % 20 = 10, 这个partition应该分给10号subtask。
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }

思考

某topic的每个partition会分给哪个subtask其实是确定的

topic名字是确定的 -> topic的hashCode是确定的 && subtask的数量是确定的 -> startIndex是确定的 -> 某partition会分给哪个subtask其实是确定的

为什么要算startIndex

大概是为了平均分配不同的topic,如果topic很多,每个topic都只从0开始,那么subtask 0,1,2之类的靠前subtask就需要读大量的partition。

版权声明:本文为boydfd原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/boydfd/p/11748127.html