package com.centit.dde.bizopt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.SimpleDataSet;
import com.centit.dde.producer.KafkaProducerConfig;
import com.centit.dde.producer.ProducerEntity;
import com.centit.framework.common.ResponseData;
import com.centit.product.adapter.po.SourceInfo;
import com.centit.product.metadata.dao.SourceInfoDao;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:com/centit/dde/bizopt/ProducerBizOperation.class */
public class ProducerBizOperation implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ProducerBizOperation() {
    }

    public ProducerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    public ResponseData runOpt(BizModel bizModel, JSONObject jSONObject) throws ExecutionException, InterruptedException {
        ProducerEntity producerEntity = (ProducerEntity) JSON.parseObject(JSON.toJSONString(jSONObject), ProducerEntity.class);
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(producerEntity.getDatabaseId());
        ProducerRecord producerRecord = new ProducerRecord(producerEntity.getTopic(), producerEntity.getPartition(), producerEntity.getKey(), JSON.toJSONString(bizModel.getDataSet(producerEntity.getSource()).getData()));
        KafkaProducer kafkaProducer = KafkaProducerConfig.getKafkaProducer(databaseInfoById);
        SimpleDataSet simpleDataSet = new SimpleDataSet();
        AtomicReference atomicReference = new AtomicReference("");
        if (producerEntity.getIsAsyn().booleanValue()) {
            kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (recordMetadata != null) {
                    atomicReference.set("消息发送成功，topic=" + recordMetadata.topic() + "，分区=" + recordMetadata.partition() + "，offset=" + recordMetadata.offset());
                } else if (exc != null) {
                    atomicReference.set("消息发送失败，异常信息：" + exc.getMessage());
                }
                simpleDataSet.setData(atomicReference);
                bizModel.putDataSet(producerEntity.getId(), simpleDataSet);
            });
        } else {
            RecordMetadata recordMetadata2 = (RecordMetadata) kafkaProducer.send(producerRecord).get();
            if (recordMetadata2 != null) {
                atomicReference.set("消息发送成功，topic=" + recordMetadata2.topic() + "，分区=" + recordMetadata2.partition() + "，offset=" + recordMetadata2.offset());
            }
            simpleDataSet.setData(atomicReference);
            bizModel.putDataSet(producerEntity.getId(), simpleDataSet);
        }
        if (kafkaProducer != null) {
            kafkaProducer.close();
        }
        return BuiltInOperation.getResponseSuccessData(simpleDataSet.getSize());
    }
}
