package com.centit.dde.test.producer;

import java.text.SimpleDateFormat;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/centit/dde/test/producer/Producer.class */
public class Producer {
    public static void main(String[] strArr) throws InterruptedException {
        new AtomicInteger(0);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        properties.put("key.serializer", StringSerializer.class.getName());
        properties.put("value.serializer", StringSerializer.class.getName());
        properties.put("acks", "all");
        properties.put("retries", 10);
        properties.put("retries", 10);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        new Thread(() -> {
            while (true) {
                try {
                    ProducerRecord producerRecord = new ProducerRecord("dde.demo.topic", 0, (Object) null, initMsg1());
                    long currentTimeMillis = System.currentTimeMillis();
                    kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                        if (recordMetadata != null) {
                            System.out.println("topic：" + recordMetadata.topic() + "，分区：" + recordMetadata.partition() + "，偏移量：" + recordMetadata.offset() + "，时间：" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Long.valueOf(recordMetadata.timestamp())));
                        }
                        if (exc != null) {
                            System.out.println(exc.getMessage());
                        }
                    });
                    System.out.println("异步发送结束，耗时：" + (System.currentTimeMillis() - currentTimeMillis) + "第" + atomicInteger.getAndAdd(1) + "条！");
                    Thread.sleep(10000L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static String initMsg1() {
        return "新版本 Consumer 的位移管理机制其实也很简单，就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息，提交到 __consumer_offsets 中。可以这么说，__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性，还要支持高频的写操作。显然，Kafka 的主题设计天然就满足这两个条件，因此，使用 Kafka 主题来保存位移这件事情，实际上就是一个水到渠成的想法了。";
    }

    private static String initMsg2() {
        return "其实对于这些基本概念的普及，网上资料实在太多了。我本不应该再画蛇添足了，但为了本文的完整性，我还是要花一些篇幅来重谈consumer group，至少可以说说我的理解。值得一提的是，由于我们今天基本上只探讨consumer group，对于单独的消费者不做过多讨论。";
    }

    private static String initMsg3() {
        return "消费者在消费的过程中需要记录自己消费了多少数据，即消费位置信息。在Kafka中这个位置信息有个专门的术语：位移(offset)。很多消息引擎都把这部分信息保存在服务器端(broker端)。这样做的好处当然是实现简单，但会有三个主要的问题：1. broker从此变成有状态的，会影响伸缩性；2. 需要引入应答机制(acknowledgement)来确认消费成功。3. 由于要保存很多consumer的offset信息，必然引入复杂的数据结构，造成资源浪费。而Kafka选择了不同的方式：每个consumer group保存自己的位移信息，那么只需要简单的一个整数表示位置就够了；同时可以引入checkpoint机制定期持久化，简化了应答机制的实现。";
    }

    private static String initMsg4() {
        return "正则表达式匹配主题测试！";
    }
}
