package org.apache.skywalking.oap.server.receiver.zipkin.kafka;

import com.google.gson.Gson;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.trace.SpanForward;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.SpanBytesDecoderDetector;

/* loaded from: input_file:org/apache/skywalking/oap/server/receiver/zipkin/kafka/KafkaHandler.class */
public class KafkaHandler {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
    private final ZipkinReceiverConfig config;
    private final SpanForward spanForward;
    private final Properties properties = new Properties();
    private final ThreadPoolExecutor executor;
    private final boolean enableKafkaMessageAutoCommit;
    private final List<String> topics;
    private final CounterMetrics msgDroppedIncr;
    private final CounterMetrics errorCounter;
    private final HistogramMetrics histogram;

    public KafkaHandler(ZipkinReceiverConfig zipkinReceiverConfig, ModuleManager moduleManager) {
        this.config = zipkinReceiverConfig;
        this.spanForward = new SpanForward(zipkinReceiverConfig, moduleManager);
        this.properties.setProperty("group.id", zipkinReceiverConfig.getKafkaGroupId());
        this.properties.setProperty("bootstrap.servers", zipkinReceiverConfig.getKafkaBootstrapServers());
        this.properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        this.properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        this.properties.putAll((Properties) new Gson().fromJson(zipkinReceiverConfig.getKafkaConsumerConfig(), Properties.class));
        int kafkaHandlerThreadPoolSize = zipkinReceiverConfig.getKafkaHandlerThreadPoolSize() > 0 ? zipkinReceiverConfig.getKafkaHandlerThreadPoolSize() : Runtime.getRuntime().availableProcessors() * 2;
        int kafkaHandlerThreadPoolQueueSize = zipkinReceiverConfig.getKafkaHandlerThreadPoolQueueSize() > 0 ? zipkinReceiverConfig.getKafkaHandlerThreadPoolQueueSize() : 10000;
        this.enableKafkaMessageAutoCommit = new ConsumerConfig(this.properties).getBoolean("enable.auto.commit").booleanValue();
        this.executor = new ThreadPoolExecutor(kafkaHandlerThreadPoolSize, kafkaHandlerThreadPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(kafkaHandlerThreadPoolQueueSize), new CustomThreadFactory("Zipkin-Kafka-Consumer"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.topics = Arrays.asList(zipkinReceiverConfig.getKafkaTopic().split(","));
        MetricsCreator service = moduleManager.find("telemetry").provider().getService(MetricsCreator.class);
        this.histogram = service.createHistogramMetric("trace_in_latency", "The process latency of trace data", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"zipkin-kafka"}), new double[0]);
        this.msgDroppedIncr = service.createCounter("trace_dropped_count", "The dropped number of traces", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"zipkin-kafka"}));
        this.errorCounter = service.createCounter("trace_analysis_error_count", "The error number of trace analysis", new MetricsTag.Keys(new String[]{"protocol"}), new MetricsTag.Values(new String[]{"zipkin-kafka"}));
    }

    public void start() throws ModuleStartException {
        for (int i = 0; i < this.config.getKafkaConsumers(); i++) {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.properties);
            kafkaConsumer.subscribe(this.topics);
            kafkaConsumer.seekToEnd(kafkaConsumer.assignment());
            this.executor.submit(() -> {
                runTask(kafkaConsumer);
            });
        }
    }

    private void runTask(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        if (log.isDebugEnabled()) {
            log.debug("Start Consume zipkin trace records from kafka.");
        }
        while (true) {
            try {
                ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000L));
                if (log.isDebugEnabled()) {
                    log.debug("Consume zipkin trace records from kafka, records count:[{}].", Integer.valueOf(poll.count()));
                }
                if (!poll.isEmpty()) {
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        byte[] bArr = (byte[]) ((ConsumerRecord) it.next()).value();
                        if (bArr.length < 2) {
                            this.msgDroppedIncr.inc();
                        } else {
                            this.executor.submit(() -> {
                                handleRecord(bArr);
                            });
                        }
                    }
                    if (!this.enableKafkaMessageAutoCommit) {
                        kafkaConsumer.commitAsync();
                    }
                }
            } catch (Exception e) {
                log.error("Kafka handle message error.", e);
                this.errorCounter.inc();
            }
        }
    }

    private void handleRecord(byte[] bArr) {
        HistogramMetrics.Timer createTimer = this.histogram.createTimer();
        Throwable th = null;
        try {
            try {
                this.spanForward.send(SpanBytesDecoderDetector.decoderForListMessage(bArr).decodeList(bArr));
                if (createTimer != null) {
                    if (0 == 0) {
                        createTimer.close();
                        return;
                    }
                    try {
                        createTimer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTimer != null) {
                if (th != null) {
                    try {
                        createTimer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTimer.close();
                }
            }
            throw th4;
        }
    }
}
