package cn.cfit.cnccq.jms;

import cn.cfit.cnccq.commons.CnccqCommandEnum;
import cn.cfit.cnccq.commons.CnccqResultEnum;
import cn.cfit.cnccq.exceptions.CnccqJmsRuntimeException;
import cn.cfit.cnccq.logging.Log;
import cn.cfit.cnccq.logging.LogFactory;
import cn.cfit.cnccq.netty.TcpConnection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSConsumer;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.commons.pool2.impl.BaseObjectPoolConfig;
import org.apache.commons.pool2.impl.GenericObjectPool;

/* loaded from: input_file:cn/cfit/cnccq/jms/CnccqJmsConsumer.class */
public class CnccqJmsConsumer implements JMSConsumer {
    private final CnccqJmsContext context;
    private MessageListener messageListener;
    private final boolean autoCommit;
    private final TcpConnection tcpConnection;
    private CnccqDestination destination;
    private final Log log = LogFactory.getLog((Class<?>) CnccqJmsConsumer.class);
    private final AtomicBoolean listened = new AtomicBoolean(false);

    public CnccqJmsConsumer(CnccqJmsContext cnccqJmsContext, TcpConnection tcpConnection, CnccqDestination cnccqDestination, boolean z) {
        this.context = cnccqJmsContext;
        this.tcpConnection = tcpConnection;
        this.autoCommit = z;
        this.destination = cnccqDestination;
    }

    @Override // javax.jms.JMSConsumer
    public MessageListener getMessageListener() throws JMSRuntimeException {
        return this.messageListener;
    }

    public CnccqDestination createDestination(String str) {
        return this.context.createDestination(str);
    }

    @Override // javax.jms.JMSConsumer
    public void setMessageListener(MessageListener messageListener) throws JMSRuntimeException {
        this.messageListener = messageListener;
        this.listened.set(true);
    }

    public boolean isAutoCommit() {
        return this.autoCommit;
    }

    public void init() {
        if (!this.listened.get() || Objects.isNull(this.messageListener)) {
            this.log.info("messageListener is null or not listened");
        } else {
            new Thread(() -> {
                while (this.listened.get() && Objects.nonNull(this.messageListener)) {
                    try {
                        CnccqTextMessage cnccqTextMessage = (CnccqTextMessage) receive(true);
                        if (cnccqTextMessage != null) {
                            this.messageListener.onMessage(cnccqTextMessage);
                        } else {
                            sleep();
                        }
                    } catch (Exception e) {
                        this.log.error("messageListener.onMessage error ", e);
                        this.context.rollback();
                    } finally {
                        this.context.commit();
                    }
                }
            }).start();
        }
    }

    private void sleep() {
        this.log.info("CnccqJmsConsumer  queue is empty wait 10s");
        try {
            Thread.sleep(BaseObjectPoolConfig.DEFAULT_EVICTOR_SHUTDOWN_TIMEOUT_MILLIS);
        } catch (InterruptedException e) {
            this.log.error("CnccqJmsConsumer error", e);
        }
    }

    @Override // javax.jms.JMSConsumer
    public CnccqMessage receive() {
        if (Objects.isNull(this.destination.getQueueName())) {
            this.log.error("#CnccqJmsConsumer# receive message, queue name is null");
        }
        try {
            CnccqMessage receive = receive(this.destination, this.autoCommit);
            if (Objects.isNull(receive)) {
                return null;
            }
            return new CnccqTextMessage(receive);
        } catch (CnccqJmsRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new CnccqJmsRuntimeException(e2.getMessage());
        }
    }

    private CnccqMessage receive(boolean z) {
        if (Objects.isNull(this.destination.getQueueName())) {
            this.log.error("#CnccqJmsConsumer# receive message, queue name is null");
        }
        try {
            CnccqMessage receive = receive(this.destination, z);
            if (Objects.isNull(receive)) {
                return null;
            }
            return new CnccqTextMessage(receive);
        } catch (CnccqJmsRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new CnccqJmsRuntimeException(e2.getMessage());
        }
    }

    public CnccqMessage receive(CnccqDestination cnccqDestination, boolean z) {
        try {
            return z ? this.context.receive(cnccqDestination) : this.context.receiveNotCommit(cnccqDestination, this.tcpConnection);
        } catch (CnccqJmsRuntimeException e) {
            this.context.rollback();
            throw e;
        } catch (Exception e2) {
            this.context.rollback();
            throw new CnccqJmsRuntimeException(e2.getMessage());
        }
    }

    public void commit() {
        if (Objects.nonNull(this.tcpConnection)) {
            CnccqResult sendCmd = this.tcpConnection.sendCmd(CnccqCommandEnum.COMMIT, this.destination);
            this.log.info("#CnccqJmsConsumer# receive message commit:" + sendCmd);
            if (sendCmd.isSuccess()) {
                return;
            }
            this.log.error("#CnccqJmsConsumer# send commit fail actCode:" + sendCmd.getCode());
            throw new CnccqJmsRuntimeException("CnccqJmsConsumer# send commit fail actCode:" + sendCmd.getCode(), String.valueOf(sendCmd.getCode()));
        }
    }

    public void rollback() {
        if (Objects.nonNull(this.tcpConnection)) {
            CnccqResult sendCmd = this.tcpConnection.sendCmd(CnccqCommandEnum.ROLLBACK, this.destination);
            if (sendCmd.isSuccess()) {
                return;
            }
            this.log.error("#CnccqJmsConsumer# send rollback fail actCode:" + sendCmd.getCode());
            throw new CnccqJmsRuntimeException("CnccqJmsConsumer# send rollback fail actCode:" + sendCmd.getCode(), String.valueOf(sendCmd.getCode()));
        }
    }

    public int getMessageCount(String str) {
        if (!this.autoCommit) {
            CnccqResult<Integer> messageNumber = this.tcpConnection.getMessageNumber(new CnccqDestination(str, this.destination.getQueueManager()));
            if (messageNumber.isSuccess()) {
                return messageNumber.getData().intValue();
            }
            throw new CnccqJmsRuntimeException("get message num fail：ack" + messageNumber.getCode(), String.valueOf(messageNumber.getCode()));
        }
        GenericObjectPool<TcpConnection> connectPool = this.context.getConnectPool();
        try {
            TcpConnection borrowObject = connectPool.borrowObject();
            CnccqResult<Integer> messageNumber2 = borrowObject.getMessageNumber(new CnccqDestination(str, this.destination.getQueueManager()));
            if (borrowObject != null) {
                connectPool.returnObject(borrowObject);
            }
            if (messageNumber2.isSuccess()) {
                return messageNumber2.getData().intValue();
            }
            throw new CnccqJmsRuntimeException("get message num fail：ack" + messageNumber2.getCode(), String.valueOf(messageNumber2.getCode()));
        } catch (CnccqJmsRuntimeException e) {
            this.log.info("CnccqJmsConsumer# getMessageCount fail:" + e.getMessage() + ", ErrorCode:" + e.getErrorCode());
            throw new CnccqJmsRuntimeException("CnccqJmsConsumer# getMessageCount fail:" + e.getMessage(), e.getErrorCode());
        } catch (Exception e2) {
            this.log.info("CnccqJmsConsumer# getMessageCount fail:" + e2 + ", ErrorCode:" + CnccqResultEnum.UNKNOWN_ERROR.getCode());
            throw new CnccqJmsRuntimeException("CnccqJmsConsumer# getMessageCount fail:" + e2.getMessage(), String.valueOf(CnccqResultEnum.UNKNOWN_ERROR.getCode()));
        }
    }

    @Override // javax.jms.JMSConsumer
    public Message receive(long j) {
        return null;
    }

    @Override // javax.jms.JMSConsumer
    public Message receiveNoWait() {
        return null;
    }

    @Override // javax.jms.JMSConsumer, java.lang.AutoCloseable
    public void close() {
        this.listened.set(false);
        if (this.tcpConnection != null) {
            this.tcpConnection.release();
        }
    }

    public CnccqDestination getDestination() {
        return this.destination;
    }

    public void setDestination(CnccqDestination cnccqDestination) {
        this.destination = cnccqDestination;
    }

    @Override // javax.jms.JMSConsumer
    public String getMessageSelector() {
        return null;
    }

    @Override // javax.jms.JMSConsumer
    public <T> T receiveBody(Class<T> cls) {
        return null;
    }

    @Override // javax.jms.JMSConsumer
    public <T> T receiveBody(Class<T> cls, long j) {
        return null;
    }

    @Override // javax.jms.JMSConsumer
    public <T> T receiveBodyNoWait(Class<T> cls) {
        return null;
    }
}
