/*
 * Decompiled with CFR 0.152.
 */
package com.seeyon.ctp.common.mq;

import com.seeyon.ctp.common.constants.SystemProperties;
import com.seeyon.ctp.common.exceptions.BusinessException;
import com.seeyon.ctp.common.log.CtpLogFactory;
import com.seeyon.ctp.common.mq.ChannelTypeEnum;
import com.seeyon.ctp.common.mq.Consummer;
import com.seeyon.ctp.common.mq.MQProcessorFactory;
import com.seeyon.ctp.common.mq.MapMessage;
import com.seeyon.ctp.common.mq.Message;
import com.seeyon.ctp.common.mq.MessageAckTypeEnum;
import com.seeyon.ctp.common.mq.MessageReceiver;
import com.seeyon.ctp.common.mq.ObjectMessage;
import com.seeyon.ctp.common.mq.Producer;
import com.seeyon.ctp.common.mq.TextMessage;
import java.io.File;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;

public class MQHandler {
    private static final Log logger = CtpLogFactory.getLog(MQHandler.class);
    private static Producer producer;
    private static Consummer consumer;
    private static boolean enabled;
    public static boolean isEmbeddedServer;

    static void initialize() {
        if (!MQHandler.startMQServerIfNecessary()) {
            logger.error((Object)"****************** \u542f\u52a8\u5d4c\u5165\u5f0fMQ\u51fa\u9519\uff0c\u8bf7\u68c0\u67e5\uff01******************");
        }
        try {
            MQHandler.initProducer();
            MQHandler.initConsumer();
        }
        catch (BusinessException e) {
            logger.error((Object)"****************** \u521d\u59cb\u5316MQ Consumer\u6216Producer\u51fa\u9519 ******************");
            logger.error((Object)e.getLocalizedMessage(), (Throwable)e);
        }
        enabled = true;
    }

    public static void sendTopicTextMessage(String topic, String key, String text) throws BusinessException {
        MQHandler.sendTextMessage(ChannelTypeEnum.TOPIC, topic, key, text);
    }

    public static void sendQueueTextMessage(String queue, String key, String text) throws BusinessException {
        MQHandler.sendTextMessage(ChannelTypeEnum.QUEUE, queue, key, text);
    }

    public static void sendTopicMapMessage(String topic, String key, HashMap data) throws BusinessException {
        MQHandler.sendMapMessage(ChannelTypeEnum.TOPIC, topic, key, data);
    }

    public static void sendQueueMapMessage(String queue, String key, HashMap data) throws BusinessException {
        MQHandler.sendMapMessage(ChannelTypeEnum.QUEUE, queue, key, data);
    }

    public static void sendTopicObjectMessage(String topic, String key, Serializable object) throws BusinessException {
        MQHandler.sendObjectMessage(ChannelTypeEnum.TOPIC, topic, key, object);
    }

    public static void sendQueueObjectMessage(String queue, String key, Serializable object) throws BusinessException {
        MQHandler.sendObjectMessage(ChannelTypeEnum.QUEUE, queue, key, object);
    }

    public static void sendMessage(Message message) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ\u76f8\u5173\u7ec4\u4ef6\u672a\u521d\u59cb\u5316\uff0c\u8bf7\u68c0\u67e5\u662f\u5426\u5f00\u542f");
        }
        if (message.getChannelTypeEnum().equals((Object)ChannelTypeEnum.QUEUE)) {
            producer.sendQueueMessage(message.getChannel(), message);
        } else {
            producer.sendTopicMessage(message.getChannel(), message);
        }
    }

    public static void subscribe(ChannelTypeEnum channelTypeEnum, String channel, MessageReceiver receiver) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ\u76f8\u5173\u7ec4\u4ef6\u672a\u521d\u59cb\u5316\uff0c\u8bf7\u68c0\u67e5\u662f\u5426\u5f00\u542f");
        }
        consumer.subscribe(MessageAckTypeEnum.AUTO_ACK, channelTypeEnum, channel, receiver);
    }

    public static void subscribe(MessageAckTypeEnum ackType, ChannelTypeEnum channelTypeEnum, String channel, MessageReceiver receiver) throws BusinessException {
        if (!enabled) {
            throw new BusinessException("MQ\u76f8\u5173\u7ec4\u4ef6\u672a\u521d\u59cb\u5316\uff0c\u8bf7\u68c0\u67e5\u662f\u5426\u5f00\u542f");
        }
        consumer.subscribe(ackType, channelTypeEnum, channel, receiver);
    }

    private static void sendTextMessage(ChannelTypeEnum channelTypeEnum, String channel, String key, String text) throws BusinessException {
        TextMessage message = new TextMessage(text);
        message.setChannelTypeEnum(channelTypeEnum);
        message.setChannel(channel);
        message.setKey(key);
        MQHandler.sendMessage(message);
    }

    private static void sendMapMessage(ChannelTypeEnum channelTypeEnum, String channel, String key, HashMap data) throws BusinessException {
        MapMessage message = new MapMessage(data);
        message.setChannelTypeEnum(channelTypeEnum);
        message.setChannel(channel);
        message.setKey(key);
        MQHandler.sendMessage(message);
    }

    private static void sendObjectMessage(ChannelTypeEnum channelTypeEnum, String channel, String key, Serializable object) throws BusinessException {
        ObjectMessage message = new ObjectMessage(object);
        message.setChannelTypeEnum(channelTypeEnum);
        message.setChannel(channel);
        message.setKey(key);
        MQHandler.sendMessage(message);
    }

    private static boolean startMQServerIfNecessary() {
        if (!"ActiveMQ".equals(SystemProperties.getInstance().getProperty("mq.config.type"))) {
            isEmbeddedServer = false;
            logger.info((Object)"\u975eActiveMQ \u4e0d\u542f\u52a8\u672c\u5730\u5d4c\u5165\u5f0fActiveMQ");
            return true;
        }
        if (!SystemProperties.getInstance().getProperty("mq.config.connection").contains("localhost")) {
            isEmbeddedServer = false;
            logger.info((Object)"IP\u5730\u5740\u914d\u7f6e\u975elocalhost\uff0c\u4e0d\u542f\u52a8\u672c\u5730\u5d4c\u5165\u5f0fActiveMQ");
            return true;
        }
        logger.info((Object)"\u542f\u52a8\u5d4c\u5165\u5f0fActiveMQ Server ......");
        try {
            BrokerService broker = new BrokerService();
            broker.addConnector(SystemProperties.getInstance().getProperty("mq.config.connection"));
            broker.setUseJmx(false);
            broker.setPersistent(true);
            String mqDataPath = SystemProperties.getInstance().getProperty("ctp.base.folder") + "/activemq";
            broker.getPersistenceAdapter().setDirectory(new File(mqDataPath));
            broker.start();
        }
        catch (Exception e) {
            logger.error((Object)e.getLocalizedMessage(), (Throwable)e);
            return false;
        }
        logger.info((Object)"\u542f\u52a8\u5d4c\u5165\u5f0fActiveMQ Server \u5b8c\u6210");
        return true;
    }

    private static void initConsumer() throws BusinessException {
        String mqType = SystemProperties.getInstance().getProperty("mq.config.type");
        String mqConnUrl = SystemProperties.getInstance().getProperty("mq.config.connection");
        String connUrl = "failover:(" + mqConnUrl.trim() + ")?initialReconnectDelay=1000&maxReconnectDelay=30000";
        String user = SystemProperties.getInstance().getProperty("mq.config.user");
        String password = SystemProperties.getInstance().getProperty("mq.config.password");
        int minThreadNum = Integer.parseInt(SystemProperties.getInstance().getProperty("mq.config.minThreadNum"));
        int maxThreadNum = Integer.parseInt(SystemProperties.getInstance().getProperty("mq.config.maxThreadNum"));
        consumer = MQProcessorFactory.generateConsumer(mqType, connUrl, user, password);
        consumer.initThreadPoolExecutor(minThreadNum, maxThreadNum, 1000);
    }

    private static void initProducer() throws BusinessException {
        String mqType = SystemProperties.getInstance().getProperty("mq.config.type");
        String mqConnUrl = SystemProperties.getInstance().getProperty("mq.config.connection");
        String connUrl = "failover:(" + mqConnUrl.trim() + ")?initialReconnectDelay=1000&maxReconnectDelay=30000";
        String user = SystemProperties.getInstance().getProperty("mq.config.user");
        String password = SystemProperties.getInstance().getProperty("mq.config.password");
        producer = MQProcessorFactory.generateProducer(mqType, connUrl, user, password);
    }

    static {
        enabled = false;
        isEmbeddedServer = true;
    }
}

