package net.x52im.mobileimsdk.server.bridge;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Map;
import java.util.Observer;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/x52im/mobileimsdk/server/bridge/MQProvider.class */
public class MQProvider {
    private static Logger logger = LoggerFactory.getLogger(MQProvider.class);
    public static final String DEFAULT_ENCODE_CHARSET = "UTF-8";
    public static final String DEFAULT_DECODE_CHARSET = "UTF-8";
    protected ConnectionFactory _factory;
    protected Connection _connection;
    protected Channel _pubChannel;
    protected final Timer timerForStartAgain;
    protected boolean startRunning;
    protected final Timer timerForRetryWorker;
    protected boolean retryWorkerRunning;
    protected ConcurrentLinkedQueue<String[]> publishTrayAgainCache;
    protected boolean publishTrayAgainEnable;
    protected Observer consumerObserver;
    protected String encodeCharset;
    protected String decodeCharset;
    protected String mqURI;
    protected String publishToQueue;
    protected String consumFromQueue;
    protected String TAG;

    public MQProvider(String str, String str2, String str3, String str4, boolean z) {
        this(str, str2, str3, null, null, str4, z);
    }

    public MQProvider(String str, String str2, String str3, String str4, String str5, String str6, boolean z) {
        this._factory = null;
        this._connection = null;
        this._pubChannel = null;
        this.timerForStartAgain = new Timer();
        this.startRunning = false;
        this.timerForRetryWorker = new Timer();
        this.retryWorkerRunning = false;
        this.publishTrayAgainCache = new ConcurrentLinkedQueue<>();
        this.publishTrayAgainEnable = false;
        this.consumerObserver = null;
        this.encodeCharset = null;
        this.decodeCharset = null;
        this.mqURI = null;
        this.publishToQueue = null;
        this.consumFromQueue = null;
        this.TAG = null;
        this.mqURI = str;
        this.publishToQueue = str2;
        this.consumFromQueue = str3;
        this.encodeCharset = str4;
        this.decodeCharset = str5;
        this.TAG = str6;
        if (this.mqURI == null) {
            throw new IllegalArgumentException("[" + str6 + "]无效的参数mqURI ！");
        }
        if (this.publishToQueue == null && this.consumFromQueue == null) {
            throw new IllegalArgumentException("[" + str6 + "]无效的参数，publishToQueue和consumFromQueue至少应设置其一！");
        }
        if (this.encodeCharset == null || this.encodeCharset.trim().length() == 0) {
            this.encodeCharset = "UTF-8";
        }
        if (this.decodeCharset == null || this.decodeCharset.trim().length() == 0) {
            this.decodeCharset = "UTF-8";
        }
        init();
    }

    protected boolean init() {
        String str = this.mqURI;
        this._factory = new ConnectionFactory();
        try {
            this._factory.setUri(str);
            this._factory.setAutomaticRecoveryEnabled(true);
            this._factory.setTopologyRecoveryEnabled(false);
            this._factory.setNetworkRecoveryInterval(5000);
            this._factory.setRequestedHeartbeat(30);
            this._factory.setConnectionTimeout(30000);
            return true;
        } catch (Exception e) {
            logger.error("[" + this.TAG + "] - 【严重】factory.setUri()时出错，Uri格式不对哦，uri=" + str, e);
            return false;
        }
    }

    protected Connection tryGetConnection() {
        if (this._connection == null) {
            try {
                this._connection = this._factory.newConnection();
                this._connection.addShutdownListener(new ShutdownListener() { // from class: net.x52im.mobileimsdk.server.bridge.MQProvider.1
                    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
                        MQProvider.logger.warn("[" + MQProvider.this.TAG + "] - 连接已经关闭了。。。。【NO】");
                    }
                });
                this._connection.addRecoveryListener(new RecoveryListener() { // from class: net.x52im.mobileimsdk.server.bridge.MQProvider.2
                    public void handleRecovery(Recoverable recoverable) {
                        MQProvider.logger.info("[" + MQProvider.this.TAG + "] - 连接已成功自动恢复了！【OK】");
                        MQProvider.this.start();
                    }
                });
            } catch (Exception e) {
                logger.error("[" + this.TAG + "] - 【NO】getConnection()时出错了，原因是：" + e.getMessage(), e);
                this._connection = null;
                return null;
            }
        }
        return this._connection;
    }

    public void start() {
        if (this.startRunning) {
            return;
        }
        try {
            if (this._factory != null) {
                Connection tryGetConnection = tryGetConnection();
                if (tryGetConnection != null) {
                    whenConnected(tryGetConnection);
                } else {
                    logger.error("[" + this.TAG + "-↑] - [start()中]【严重】connction还没有准备好，conn.createChannel()失败，start()没有继续！(原因：connction==null)【5秒后重新尝试start】");
                    this.timerForStartAgain.schedule(new TimerTask() { // from class: net.x52im.mobileimsdk.server.bridge.MQProvider.3
                        @Override // java.util.TimerTask, java.lang.Runnable
                        public void run() {
                            MQProvider.this.start();
                        }
                    }, 5000L);
                }
            } else {
                logger.error("[" + this.TAG + "-↑] - [start()中]【严重】factory还没有准备好，start()失败！(原因：factory==null)");
            }
        } finally {
            this.startRunning = false;
        }
    }

    protected void whenConnected(Connection connection) {
        startPublisher(connection);
        startWorker(connection);
    }

    protected void startPublisher(Connection connection) {
        if (connection == null) {
            logger.error("[" + this.TAG + "-↑] - [startPublisher()中]【严重】connction还没有准备好，conn.createChannel()失败！(原因：connction==null)");
            return;
        }
        if (this._pubChannel != null && this._pubChannel.isOpen()) {
            try {
                this._pubChannel.close();
            } catch (Exception e) {
                logger.warn("[" + this.TAG + "-↑] - [startPublisher()中]pubChannel.close()时发生错误。", e);
            }
        }
        try {
            this._pubChannel = connection.createChannel();
            logger.info("[" + this.TAG + "-↑] - [startPublisher()中] 的channel成功创建了，马上开始循环publish消息，当前数组队列长度：N/A！【OK】");
            AMQP.Queue.DeclareOk queueDeclare = this._pubChannel.queueDeclare(this.publishToQueue, true, false, false, (Map) null);
            logger.info("[" + this.TAG + "-↑] - [startPublisher中] Queue[当前队列消息数：" + queueDeclare.getMessageCount() + ",消费者：" + queueDeclare.getConsumerCount() + "]已成功建立，Publisher初始化成功，消息将可publish过去且不怕丢失了。【OK】(当前暂存数组长度:N/A)");
            if (this.publishTrayAgainEnable) {
                while (this.publishTrayAgainCache.size() > 0) {
                    String[] poll = this.publishTrayAgainCache.poll();
                    if (poll == null || poll.length <= 0) {
                        logger.debug("[" + this.TAG + "-↑] - [startPublisher()中] [___]在channel成功创建后，当前之前失败暂存的数据队列已为空，publish没有继续！[当前数组队列长度：" + this.publishTrayAgainCache.size() + "]！【OK】");
                        return;
                    } else {
                        logger.debug("[" + this.TAG + "-↑] - [startPublisher()中] [...]在channel成功创建后，正在publish之前失败暂存的消息 m[0]=" + poll[0] + "、m[1]=" + poll[1] + ",、m[2]=" + poll[2] + "，[当前数组队列长度：" + this.publishTrayAgainCache.size() + "]！【OK】");
                        publish(poll[0], poll[1], poll[2]);
                    }
                }
            }
        } catch (Exception e2) {
            logger.error("[" + this.TAG + "-↑] - [startPublisher()中] conn.createChannel()或pubChannel.queueDeclare()出错了，本次startPublisher没有继续！", e2);
        }
    }

    public boolean publish(String str) {
        return publish("", this.publishToQueue, str);
    }

    protected boolean publish(String str, String str2, String str3) {
        boolean z = false;
        try {
            this._pubChannel.basicPublish(str, str2, MessageProperties.PERSISTENT_TEXT_PLAIN, str3.getBytes(this.encodeCharset));
            logger.info("[" + this.TAG + "-↑] - [startPublisher()中] publish()成功了 ！(数据:" + str + "," + str2 + "," + str3 + ")");
            z = true;
        } catch (Exception e) {
            if (this.publishTrayAgainEnable) {
                this.publishTrayAgainCache.add(new String[]{str, str2, str3});
            }
            logger.error("[" + this.TAG + "-↑] - [startPublisher()中] publish()时Exception了，原因：" + e.getMessage() + "【数据[" + str + "," + str2 + "," + str3 + "]已重新放回数组首位，当前数组长度：N/A】", e);
        }
        return z;
    }

    protected void startWorker(Connection connection) {
        try {
            if (this.retryWorkerRunning) {
                return;
            }
            if (connection == null) {
                throw new Exception("[" + this.TAG + "-↓] - 【严重】connction还没有准备好，conn.createChannel()失败！(原因：connction==null)");
            }
            final Channel createChannel = connection.createChannel();
            createChannel.basicConsume(this.consumFromQueue, false, new DefaultConsumer(createChannel) { // from class: net.x52im.mobileimsdk.server.bridge.MQProvider.4
                public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    try {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = basicProperties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        MQProvider.logger.info("[" + MQProvider.this.TAG + "-↓] - [startWorker()中的handleDelivery] 收到一条新消息(routingKey=" + routingKey + ",contentType=" + contentType + ",consumerTag=" + str + ",deliveryTag=" + deliveryTag + ")，马上开始处理。。。。");
                        if (MQProvider.this.work(bArr)) {
                            createChannel.basicAck(deliveryTag, false);
                        } else {
                            createChannel.basicReject(deliveryTag, true);
                        }
                    } catch (Exception e) {
                        MQProvider.logger.info("[" + MQProvider.this.TAG + "-↓] - [startWorker()中handleDelivery时] 出现错误，错误将被记录：" + e.getMessage(), e);
                    }
                }
            });
            logger.info("[" + this.TAG + "-↓] - [startWorker()中] Worker已经成功开启并运行中...【OK】");
        } catch (Exception e) {
            logger.error("[" + this.TAG + "-↓] - [startWorker()中] conn.createChannel()或Consumer操作时出错了，本次startWorker没有继续【暂停5秒后重试startWorker()】！", e);
            this.timerForRetryWorker.schedule(new TimerTask() { // from class: net.x52im.mobileimsdk.server.bridge.MQProvider.5
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    MQProvider.this.startWorker(MQProvider.this._connection);
                }
            }, 5000L);
        } finally {
            this.retryWorkerRunning = false;
        }
    }

    protected boolean work(byte[] bArr) {
        try {
            logger.info("[" + this.TAG + "-↓] - [startWorker()中work时] Got msg：" + new String(bArr, this.decodeCharset));
            return true;
        } catch (Exception e) {
            logger.warn("[" + this.TAG + "-↓] - [startWorker()中work时] 出现错误，错误将被记录：" + e.getMessage(), e);
            return true;
        }
    }
}
