package com.centit.dde.bizopt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.centit.dde.consumer.ConsumerEntity;
import com.centit.dde.consumer.KafkaConsumerConfig;
import com.centit.dde.core.BizModel;
import com.centit.dde.core.BizOperation;
import com.centit.dde.core.SimpleDataSet;
import com.centit.framework.common.ResponseData;
import com.centit.product.adapter.po.SourceInfo;
import com.centit.product.metadata.dao.SourceInfoDao;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/centit/dde/bizopt/ConsumerBizOperation.class */
public class ConsumerBizOperation implements BizOperation {
    private SourceInfoDao sourceInfoDao;

    public ConsumerBizOperation(SourceInfoDao sourceInfoDao) {
        this.sourceInfoDao = sourceInfoDao;
    }

    public ResponseData runOpt(BizModel bizModel, JSONObject jSONObject) {
        ConsumerEntity consumerEntity = (ConsumerEntity) JSON.parseObject(JSON.toJSONString(jSONObject), ConsumerEntity.class);
        SourceInfo databaseInfoById = this.sourceInfoDao.getDatabaseInfoById(consumerEntity.getDatabaseId());
        JSONObject extProps = databaseInfoById.getExtProps();
        extProps.put("group.id", consumerEntity.getGroupId());
        KafkaConsumer kafkaConsumer = KafkaConsumerConfig.getKafkaConsumer(extProps, databaseInfoById);
        JSONArray topic = consumerEntity.getTopic();
        if (topic != null || topic.size() == 0) {
            throw new RuntimeException("topic不能为空！");
        }
        if (topic.size() == 1 && topic.getString(0).contains(".*")) {
            kafkaConsumer.subscribe(Pattern.compile(topic.getString(0)));
        } else {
            kafkaConsumer.subscribe(topic);
        }
        ConsumerRecords poll = kafkaConsumer.poll(1000L);
        kafkaConsumer.commitSync();
        if (kafkaConsumer != null) {
            kafkaConsumer.close();
        }
        ArrayList arrayList = new ArrayList();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            arrayList.add(((ConsumerRecord) it.next()).value());
        }
        SimpleDataSet simpleDataSet = new SimpleDataSet(arrayList);
        bizModel.putDataSet(consumerEntity.getId(), simpleDataSet);
        return BuiltInOperation.getResponseSuccessData(simpleDataSet.getSize());
    }
}
