package org.apache.flink.connector.kafka.source.reader;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.cli.HelpFormatter;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.class */
public class KafkaPartitionSplitReader implements SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
    private static final long POLL_TIMEOUT = 10000;
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Map<TopicPartition, Long> stoppingOffsets;
    private final String groupId;
    private final int subtaskId;
    private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
    private final Set<String> emptySplits = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader$KafkaPartitionSplitRecords.class */
    public static class KafkaPartitionSplitRecords implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
        private final Set<String> finishedSplits;
        private final Map<TopicPartition, Long> stoppingOffsets;
        private final ConsumerRecords<byte[], byte[]> consumerRecords;
        private final KafkaSourceReaderMetrics metrics;
        private final Iterator<TopicPartition> splitIterator;
        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
        private TopicPartition currentTopicPartition;
        private Long currentSplitStoppingOffset;

        private KafkaPartitionSplitRecords(ConsumerRecords<byte[], byte[]> consumerRecords, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
            this.finishedSplits = new HashSet();
            this.stoppingOffsets = new HashMap();
            this.consumerRecords = consumerRecords;
            this.splitIterator = consumerRecords.partitions().iterator();
            this.metrics = kafkaSourceReaderMetrics;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setPartitionStoppingOffset(TopicPartition topicPartition, long j) {
            this.stoppingOffsets.put(topicPartition, Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addFinishedSplit(String str) {
            this.finishedSplits.add(str);
        }

        @Override // org.apache.flink.connector.base.source.reader.RecordsWithSplitIds
        @Nullable
        public String nextSplit() {
            if (this.splitIterator.hasNext()) {
                this.currentTopicPartition = this.splitIterator.next();
                this.recordIterator = this.consumerRecords.records(this.currentTopicPartition).iterator();
                this.currentSplitStoppingOffset = this.stoppingOffsets.getOrDefault(this.currentTopicPartition, Long.MAX_VALUE);
                return this.currentTopicPartition.toString();
            }
            this.currentTopicPartition = null;
            this.recordIterator = null;
            this.currentSplitStoppingOffset = null;
            return null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.connector.base.source.reader.RecordsWithSplitIds
        @Nullable
        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
            Preconditions.checkNotNull(this.currentTopicPartition, "Make sure nextSplit() did not return null before iterate over the records split.");
            if (!this.recordIterator.hasNext()) {
                return null;
            }
            ConsumerRecord<byte[], byte[]> next = this.recordIterator.next();
            if (next.offset() >= this.currentSplitStoppingOffset.longValue()) {
                return null;
            }
            this.metrics.recordCurrentOffset(this.currentTopicPartition, next.offset());
            return next;
        }

        @Override // org.apache.flink.connector.base.source.reader.RecordsWithSplitIds
        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }

    public KafkaPartitionSplitReader(Properties properties, SourceReaderContext sourceReaderContext, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
        this.subtaskId = sourceReaderContext.getIndexOfSubtask();
        this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty("client.id", createConsumerClientId(properties));
        this.consumer = new KafkaConsumer<>(properties2);
        this.stoppingOffsets = new HashMap();
        this.groupId = properties2.getProperty("group.id");
        maybeRegisterKafkaConsumerMetrics(properties, kafkaSourceReaderMetrics, this.consumer);
        this.kafkaSourceReaderMetrics.registerNumBytesIn(this.consumer);
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
        try {
            ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
            KafkaPartitionSplitRecords kafkaPartitionSplitRecords = new KafkaPartitionSplitRecords(poll, this.kafkaSourceReaderMetrics);
            ArrayList arrayList = new ArrayList();
            for (TopicPartition topicPartition : poll.partitions()) {
                long stoppingOffset = getStoppingOffset(topicPartition);
                List<ConsumerRecord<byte[], byte[]>> records = poll.records(topicPartition);
                if (records.size() > 0) {
                    ConsumerRecord<byte[], byte[]> consumerRecord = records.get(records.size() - 1);
                    if (consumerRecord.offset() >= stoppingOffset - 1) {
                        kafkaPartitionSplitRecords.setPartitionStoppingOffset(topicPartition, stoppingOffset);
                        finishSplitAtRecord(topicPartition, stoppingOffset, consumerRecord.offset(), arrayList, kafkaPartitionSplitRecords);
                    }
                }
                this.kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(this.consumer, topicPartition);
            }
            if (!this.emptySplits.isEmpty()) {
                kafkaPartitionSplitRecords.finishedSplits.addAll(this.emptySplits);
                this.emptySplits.clear();
            }
            if (!arrayList.isEmpty()) {
                KafkaSourceReaderMetrics kafkaSourceReaderMetrics = this.kafkaSourceReaderMetrics;
                kafkaSourceReaderMetrics.getClass();
                arrayList.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
                unassignPartitions(arrayList);
            }
            this.kafkaSourceReaderMetrics.updateNumBytesInCounter();
            return kafkaPartitionSplitRecords;
        } catch (WakeupException e) {
            return new KafkaPartitionSplitRecords(ConsumerRecords.empty(), this.kafkaSourceReaderMetrics);
        }
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        HashSet hashSet = new HashSet();
        splitsChange.splits().forEach(kafkaPartitionSplit -> {
            arrayList.add(kafkaPartitionSplit.getTopicPartition());
            parseStartingOffsets(kafkaPartitionSplit, arrayList2, arrayList3, hashMap);
            parseStoppingOffsets(kafkaPartitionSplit, arrayList4, hashSet);
            this.kafkaSourceReaderMetrics.registerTopicPartition(kafkaPartitionSplit.getTopicPartition());
        });
        arrayList.addAll(this.consumer.assignment());
        this.consumer.assign(arrayList);
        seekToStartingOffsets(arrayList2, arrayList3, hashMap);
        acquireAndSetStoppingOffsets(arrayList4, hashSet);
        removeEmptySplits();
        maybeLogSplitChangesHandlingResult(splitsChange);
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void wakeUp() {
        this.consumer.wakeup();
    }

    @Override // org.apache.flink.connector.base.source.reader.splitreader.SplitReader
    public void close() throws Exception {
        this.consumer.close();
    }

    public void notifyCheckpointComplete(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        this.consumer.commitAsync(map, offsetCommitCallback);
    }

    @VisibleForTesting
    KafkaConsumer<byte[], byte[]> consumer() {
        return this.consumer;
    }

    private void parseStartingOffsets(KafkaPartitionSplit kafkaPartitionSplit, List<TopicPartition> list, List<TopicPartition> list2, Map<TopicPartition, Long> map) {
        TopicPartition topicPartition = kafkaPartitionSplit.getTopicPartition();
        if (kafkaPartitionSplit.getStartingOffset() == -2) {
            list.add(topicPartition);
        } else if (kafkaPartitionSplit.getStartingOffset() == -1) {
            list2.add(topicPartition);
        } else {
            if (kafkaPartitionSplit.getStartingOffset() == -3) {
                return;
            }
            map.put(topicPartition, Long.valueOf(kafkaPartitionSplit.getStartingOffset()));
        }
    }

    private void parseStoppingOffsets(KafkaPartitionSplit kafkaPartitionSplit, List<TopicPartition> list, Set<TopicPartition> set) {
        TopicPartition topicPartition = kafkaPartitionSplit.getTopicPartition();
        kafkaPartitionSplit.getStoppingOffset().ifPresent(l -> {
            if (l.longValue() >= 0) {
                this.stoppingOffsets.put(topicPartition, l);
            } else if (l.longValue() == -1) {
                list.add(topicPartition);
            } else {
                if (l.longValue() != -3) {
                    throw new FlinkRuntimeException(String.format("Invalid stopping offset %d for partition %s", l, topicPartition));
                }
                set.add(topicPartition);
            }
        });
    }

    private void seekToStartingOffsets(List<TopicPartition> list, List<TopicPartition> list2, Map<TopicPartition, Long> map) {
        if (!list.isEmpty()) {
            LOG.trace("Seeking starting offsets to beginning: {}", list);
            this.consumer.seekToBeginning(list);
        }
        if (!list2.isEmpty()) {
            LOG.trace("Seeking starting offsets to end: {}", list2);
            this.consumer.seekToEnd(list2);
        }
        if (map.isEmpty()) {
            return;
        }
        LOG.trace("Seeking starting offsets to specified offsets: {}", map);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        map.forEach((v1, v2) -> {
            r1.seek(v1, v2);
        });
    }

    private void acquireAndSetStoppingOffsets(List<TopicPartition> list, Set<TopicPartition> set) {
        this.stoppingOffsets.putAll(this.consumer.endOffsets(list));
        if (set.isEmpty()) {
            return;
        }
        this.consumer.committed(set).forEach((topicPartition, offsetAndMetadata) -> {
            Preconditions.checkNotNull(offsetAndMetadata, String.format("Partition %s should stop at committed offset. But there is no committed offset of this partition for group %s", topicPartition, this.groupId));
            this.stoppingOffsets.put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
        });
    }

    private void removeEmptySplits() {
        ArrayList arrayList = new ArrayList();
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            if (this.consumer.position(topicPartition) >= getStoppingOffset(topicPartition)) {
                arrayList.add(topicPartition);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.debug("These assigning splits are empty and will be marked as finished in later fetch: {}", arrayList);
        this.emptySplits.addAll((Collection) arrayList.stream().map(KafkaPartitionSplit::toSplitId).collect(Collectors.toSet()));
        unassignPartitions(arrayList);
    }

    private void maybeLogSplitChangesHandlingResult(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (LOG.isDebugEnabled()) {
            StringJoiner stringJoiner = new StringJoiner(",");
            for (KafkaPartitionSplit kafkaPartitionSplit : splitsChange.splits()) {
                stringJoiner.add(String.format("[%s, start:%d, stop: %d]", kafkaPartitionSplit.getTopicPartition(), Long.valueOf(this.consumer.position(kafkaPartitionSplit.getTopicPartition())), Long.valueOf(getStoppingOffset(kafkaPartitionSplit.getTopicPartition()))));
            }
            LOG.debug("SplitsChange handling result: {}", stringJoiner);
        }
    }

    private void unassignPartitions(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet(this.consumer.assignment());
        hashSet.removeAll(collection);
        this.consumer.assign(hashSet);
    }

    private String createConsumerClientId(Properties properties) {
        return properties.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key()) + HelpFormatter.DEFAULT_OPT_PREFIX + this.subtaskId;
    }

    private void finishSplitAtRecord(TopicPartition topicPartition, long j, long j2, List<TopicPartition> list, KafkaPartitionSplitRecords kafkaPartitionSplitRecords) {
        LOG.debug("{} has reached stopping offset {}, current offset is {}", new Object[]{topicPartition, Long.valueOf(j), Long.valueOf(j2)});
        list.add(topicPartition);
        kafkaPartitionSplitRecords.addFinishedSplit(KafkaPartitionSplit.toSplitId(topicPartition));
    }

    private long getStoppingOffset(TopicPartition topicPartition) {
        return this.stoppingOffsets.getOrDefault(topicPartition, Long.MAX_VALUE).longValue();
    }

    private void maybeRegisterKafkaConsumerMetrics(Properties properties, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, KafkaConsumer<?, ?> kafkaConsumer) {
        if (((Boolean) KafkaSourceOptions.getOption(properties, KafkaSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS, Boolean::parseBoolean)).booleanValue()) {
            kafkaSourceReaderMetrics.registerKafkaConsumerMetrics(kafkaConsumer);
        }
    }
}
