package com.centit.dde.bizopt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.centit.dde.config.DDEConsumerConfig;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.SimpleDataSet;
import com.centit.dde.entity.ConsumerEntity;
import com.centit.framework.common.ResponseData;
import com.centit.product.adapter.po.SourceInfo;
import com.centit.product.metadata.dao.SourceInfoDao;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/centit/dde/bizopt/ConsumerBizOperation.class */
public class ConsumerBizOperation implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ConsumerBizOperation() {
    }

    public ConsumerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    public ResponseData runOpt(BizModel bizModel, JSONObject jSONObject) {
        ConsumerEntity consumerEntity = (ConsumerEntity) JSON.parseObject(JSON.toJSONString(jSONObject), ConsumerEntity.class);
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(consumerEntity.getDataSourceID());
        JSONObject extProps = databaseInfoById.getExtProps();
        extProps.put("group.id", consumerEntity.getGroupId());
        KafkaConsumer kafkaConsumer = DDEConsumerConfig.getKafkaConsumer(extProps, databaseInfoById);
        String topic = consumerEntity.getTopic();
        ConsumerRecords consumerRecords = null;
        try {
            try {
                if (StringUtils.isNotBlank(topic)) {
                    String[] split = topic.split(",");
                    if (split.length == 1 && split[0].contains(".*")) {
                        kafkaConsumer.subscribe(Pattern.compile(split[0]));
                    } else {
                        kafkaConsumer.subscribe(Arrays.asList(split));
                    }
                    consumerRecords = kafkaConsumer.poll(1000L);
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                ArrayList arrayList = new ArrayList();
                Iterator it = consumerRecords.iterator();
                while (it.hasNext()) {
                    arrayList.add(((ConsumerRecord) it.next()).value());
                }
                SimpleDataSet simpleDataSet = new SimpleDataSet(arrayList);
                bizModel.putDataSet(consumerEntity.getId(), simpleDataSet);
                return BuiltInOperation.getResponseSuccessData(simpleDataSet.getSize());
            } catch (Exception e) {
                ResponseData responseData = BuiltInOperation.getResponseData(0, 500, jSONObject.getString("SetsName") + "异常信息：" + e.getMessage());
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return responseData;
            }
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }
}
