/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.resume.kafka;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
import org.apache.camel.processor.resume.kafka.RecordError;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleNodeKafkaResumeStrategy<T extends Resumable>
implements KafkaResumeStrategy<T> {
    private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
    private Consumer<byte[], byte[]> consumer;
    private Producer<byte[], byte[]> producer;
    private Duration pollDuration = Duration.ofSeconds(1L);
    private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<RecordError>();
    private boolean subscribed;
    private ResumeAdapter adapter;
    private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
    private final ExecutorService executorService;
    private final ReentrantLock lock = new ReentrantLock();

    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService) {
        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
        this.executorService = executorService;
    }

    protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException {
        ProducerRecord record = new ProducerRecord(this.resumeStrategyConfiguration.getTopic(), (Object)key, (Object)message);
        this.producer.send(record, (recordMetadata, e) -> {
            if (e != null) {
                LOG.error("Failed to send message {}", (Object)e.getMessage(), (Object)e);
                this.producerErrors.add(new RecordError(recordMetadata, e));
            }
        });
    }

    protected void doAdd(OffsetKey<?> key, Offset<?> offsetValue) {
        if (this.adapter instanceof Cacheable) {
            Cacheable cacheable = (Cacheable)this.adapter;
            cacheable.add(key, offsetValue);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateLastOffset(T offset) throws Exception {
        OffsetKey key = offset.getOffsetKey();
        Offset offsetValue = offset.getLastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Updating offset on Kafka with key {} to {}", key.getValue(), offsetValue.getValue());
        }
        ByteBuffer keyBuffer = key.serialize();
        ByteBuffer valueBuffer = offsetValue.serialize();
        try {
            this.lock.lock();
            this.produce(keyBuffer.array(), valueBuffer.array());
        }
        finally {
            this.lock.unlock();
        }
        this.doAdd(key, offsetValue);
    }

    public void loadCache() {
        if (!(this.adapter instanceof Deserializable)) {
            throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
        }
        CountDownLatch latch = new CountDownLatch(this.resumeStrategyConfiguration.getMaxInitializationRetries());
        this.executorService.submit(() -> this.refresh(latch));
        try {
            LOG.trace("Waiting for kafka resume strategy async initialization");
            if (!latch.await(this.resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
                LOG.debug("The initialization timed out");
            }
            LOG.trace("Kafka resume strategy initialization complete");
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void refresh(CountDownLatch latch) {
        LOG.trace("Creating a offset cache refresher");
        try {
            this.consumer = this.createConsumer();
            this.subscribe(this.consumer);
            LOG.debug("Loading records from topic {}", (Object)this.resumeStrategyConfiguration.getTopic());
            this.consumer.subscribe(Collections.singletonList(this.getResumeStrategyConfiguration().getTopic()));
            this.poll(this.consumer, latch);
        }
        catch (WakeupException e) {
            LOG.info("Kafka consumer was interrupted during a blocking call");
        }
        catch (Exception e) {
            LOG.error("Error while refreshing the local cache: {}", (Object)e.getMessage(), (Object)e);
        }
        finally {
            if (this.consumer != null) {
                this.consumer.unsubscribe();
                this.consumer.close(Duration.ofSeconds(5L));
            }
        }
    }

    protected void poll(Consumer<byte[], byte[]> consumer, CountDownLatch latch) {
        Deserializable deserializable = (Deserializable)this.adapter;
        boolean initialized = false;
        while (true) {
            ConsumerRecords<byte[], byte[]> records = this.consume(consumer);
            for (ConsumerRecord record : records) {
                byte[] value = (byte[])record.value();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Read from Kafka at {} ({}): {}", new Object[]{Instant.ofEpochMilli(record.timestamp()), record.timestampType(), value});
                }
                if (deserializable.deserialize(ByteBuffer.wrap((byte[])record.key()), ByteBuffer.wrap((byte[])record.value()))) continue;
                LOG.warn("Deserializer indicates that this is the last record to deserialize");
            }
            if (initialized) continue;
            if (latch.getCount() == 1L) {
                initialized = true;
            }
            latch.countDown();
        }
    }

    protected void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic) {
        if (!this.subscribed) {
            consumer.subscribe(Collections.singletonList(topic));
            this.subscribed = true;
        }
    }

    public void checkAndSubscribe(Consumer<byte[], byte[]> consumer, String topic, long remaining) {
        if (!this.subscribed) {
            consumer.subscribe(Collections.singletonList(topic), this.getConsumerRebalanceListener(consumer, remaining));
            this.subscribed = true;
        }
    }

    private ConsumerRebalanceListener getConsumerRebalanceListener(final Consumer<byte[], byte[]> consumer, final long remaining) {
        return new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
                for (TopicPartition assignment : assignments) {
                    long endPosition = consumer.position(assignment);
                    long startPosition = endPosition - remaining;
                    if (startPosition >= 0L) {
                        consumer.seek(assignment, startPosition);
                        continue;
                    }
                    LOG.info("Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
                }
            }
        };
    }

    protected ConsumerRecords<byte[], byte[]> consume(Consumer<byte[], byte[]> consumer) {
        ConsumerRecords records = consumer.poll(this.pollDuration);
        if (!records.isEmpty()) {
            return records;
        }
        return ConsumerRecords.empty();
    }

    protected ConsumerRecords<byte[], byte[]> consume(int retries, Consumer<byte[], byte[]> consumer) {
        while (retries > 0) {
            ConsumerRecords records = consumer.poll(this.pollDuration);
            if (!records.isEmpty()) {
                return records;
            }
            --retries;
        }
        return ConsumerRecords.empty();
    }

    private void subscribe(Consumer<byte[], byte[]> consumer) {
        if (this.adapter instanceof Cacheable) {
            ResumeCache cache = ((Cacheable)this.adapter).getCache();
            if (cache.capacity() >= 1L) {
                this.checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic(), cache.capacity());
            } else {
                this.checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic());
            }
        } else {
            this.checkAndSubscribe(consumer, this.resumeStrategyConfiguration.getTopic());
        }
    }

    public ResumeAdapter getAdapter() {
        return this.adapter;
    }

    public void setAdapter(ResumeAdapter adapter) {
        this.adapter = adapter;
    }

    protected Collection<RecordError> getProducerErrors() {
        return Collections.unmodifiableCollection(this.producerErrors);
    }

    public void build() {
    }

    public void init() {
        LOG.debug("Initializing the Kafka resume strategy");
    }

    private void createProducer() {
        if (this.producer == null) {
            this.producer = new KafkaProducer(this.resumeStrategyConfiguration.getProducerProperties());
        }
    }

    private Consumer<byte[], byte[]> createConsumer() {
        return new KafkaConsumer(this.resumeStrategyConfiguration.getConsumerProperties());
    }

    public void stop() {
        try {
            LOG.trace("Trying to obtain a lock for closing the producer");
            if (!this.lock.tryLock(1L, TimeUnit.SECONDS)) {
                LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
            }
            LOG.info("Closing the Kafka producer");
            IOHelper.close(this.producer, (String)"Kafka producer", (Logger)LOG);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.lock.unlock();
        }
        try {
            LOG.info("Closing the Kafka consumer");
            this.consumer.wakeup();
            this.executorService.shutdown();
            if (!this.executorService.awaitTermination(2L, TimeUnit.SECONDS)) {
                LOG.warn("Kafka consumer did not shutdown within 2 seconds");
                this.executorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void close() throws IOException {
        this.stop();
    }

    public void start() {
        LOG.info("Starting the kafka resume strategy");
        this.createProducer();
    }

    public Duration getPollDuration() {
        return this.pollDuration;
    }

    public void setPollDuration(Duration pollDuration) {
        this.pollDuration = Objects.requireNonNull(pollDuration, "The poll duration cannot be null");
    }

    protected Producer<byte[], byte[]> getProducer() {
        return this.producer;
    }

    public void resetProducerErrors() {
        this.producerErrors.clear();
    }

    protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
        return this.resumeStrategyConfiguration;
    }
}

