package com.centit.dde.bizopt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.centit.dde.config.DDEProducerConfig;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.DataSet;
import com.centit.dde.core.SimpleDataSet;
import com.centit.dde.entity.ProducerEntity;
import com.centit.framework.common.ResponseData;
import com.centit.product.metadata.dao.SourceInfoDao;
import com.centit.product.metadata.po.SourceInfo;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/centit/dde/bizopt/ProducerBizOperation.class */
public class ProducerBizOperation implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ProducerBizOperation() {
    }

    public ProducerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    public ResponseData runOpt(BizModel bizModel, JSONObject jSONObject) {
        ProducerEntity producerEntity = (ProducerEntity) JSON.parseObject(JSON.toJSONString(jSONObject), ProducerEntity.class);
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(producerEntity.getDataSourceID());
        DataSet dataSet = bizModel.getDataSet(producerEntity.getSourceID());
        KafkaProducer kafkaProducer = null;
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        try {
            try {
                ProducerRecord producerRecord = new ProducerRecord(producerEntity.getTopic(), producerEntity.getPartition(), producerEntity.getId(), JSON.toJSONString(dataSet.getData()));
                kafkaProducer = DDEProducerConfig.getKafkaProducer(databaseInfoById.getExtProps(), databaseInfoById);
                if (producerEntity.getAsyn().booleanValue()) {
                    kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                        if (recordMetadata != null) {
                            atomicBoolean.set(true);
                        } else if (exc != null) {
                            exc.printStackTrace();
                        }
                    });
                } else if (kafkaProducer.send(producerRecord).get() != null) {
                    atomicBoolean.set(true);
                }
                SimpleDataSet simpleDataSet = new SimpleDataSet(atomicBoolean);
                bizModel.putDataSet(producerEntity.getId(), simpleDataSet);
                ResponseData responseSuccessData = BuiltInOperation.getResponseSuccessData(simpleDataSet.getSize());
                if (kafkaProducer != null) {
                    kafkaProducer.close();
                }
                return responseSuccessData;
            } catch (Exception e) {
                ResponseData responseData = BuiltInOperation.getResponseData(0, 500, jSONObject.getString("SetsName") + "异常信息：" + e.getMessage());
                if (kafkaProducer != null) {
                    kafkaProducer.close();
                }
                return responseData;
            }
        } catch (Throwable th) {
            if (kafkaProducer != null) {
                kafkaProducer.close();
            }
            throw th;
        }
    }
}
