package com.centit.dde.bizopt;

import com.alibaba.fastjson2.JSONObject;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.DataOptContext;
import com.centit.dde.core.DataSet;
import com.centit.dde.producer.KafkaProducerConfig;
import com.centit.dde.utils.BizModelJSONTransform;
import com.centit.dde.utils.DataSetOptUtil;
import com.centit.framework.common.ResponseData;
import com.centit.product.adapter.po.SourceInfo;
import com.centit.product.metadata.dao.SourceInfoDao;
import com.centit.support.algorithm.BooleanBaseOpt;
import com.centit.support.algorithm.NumberBaseOpt;
import com.centit.support.algorithm.StringBaseOpt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/* loaded from: input_file:com/centit/dde/bizopt/ProducerBizOperation.class */
public class ProducerBizOperation implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ProducerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    public ResponseData runOpt(BizModel bizModel, JSONObject jSONObject, DataOptContext dataOptContext) throws ExecutionException, InterruptedException {
        String string = jSONObject.getString("databaseId");
        String string2 = jSONObject.getString("topic");
        if (StringUtils.isBlank(string2) || StringUtils.isBlank(string)) {
            return ResponseData.makeErrorMessage("Kafka服务地址或topic不能为空！");
        }
        String string3 = jSONObject.getString("source");
        if (StringUtils.isBlank(string3)) {
            return ResponseData.makeErrorMessage("推送消息不能为空！");
        }
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(string);
        if (databaseInfoById == null) {
            return ResponseData.makeErrorMessage("Kafka服务资源不存在或已被删除！");
        }
        KafkaProducer kafkaProducer = KafkaProducerConfig.getKafkaProducer(databaseInfoById);
        Integer castObjectToInteger = NumberBaseOpt.castObjectToInteger(jSONObject.getInteger("partition"));
        String castObjectToString = StringBaseOpt.castObjectToString(DataSetOptUtil.fetchFieldValue(new BizModelJSONTransform(bizModel), jSONObject.getString("key")), jSONObject.getString("key"));
        Boolean castObjectToBoolean = BooleanBaseOpt.castObjectToBoolean(jSONObject.getBoolean("isAsyn"), false);
        ProducerRecord producerRecord = new ProducerRecord(string2, castObjectToInteger, castObjectToString, StringBaseOpt.castObjectToString(bizModel.getDataSet(string3).getData()));
        AtomicReference atomicReference = new AtomicReference("");
        String string4 = jSONObject.getString("id");
        try {
            if (castObjectToBoolean.booleanValue()) {
                kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                    if (recordMetadata != null) {
                        atomicReference.set("消息发送成功，topic=" + recordMetadata.topic() + "，分区=" + recordMetadata.partition() + "，offset=" + recordMetadata.offset());
                    } else if (exc != null) {
                        atomicReference.set("消息发送失败，异常信息：" + exc.getMessage());
                    }
                    bizModel.putDataSet(string4, new DataSet(atomicReference));
                });
            } else {
                RecordMetadata recordMetadata2 = (RecordMetadata) kafkaProducer.send(producerRecord).get();
                if (recordMetadata2 != null) {
                    atomicReference.set("消息发送成功，topic=" + recordMetadata2.topic() + "，分区=" + recordMetadata2.partition() + "，offset=" + recordMetadata2.offset());
                }
                bizModel.putDataSet(string4, new DataSet(atomicReference));
            }
            return BuiltInOperation.createResponseSuccessData(bizModel.getDataSet(string4).getSize());
        } finally {
            if (kafkaProducer != null) {
                kafkaProducer.close();
            }
        }
    }
}
