/*
 * Decompiled with CFR 0.152.
 */
package cn.cfit.cnccq.jms;

import cn.cfit.cnccq.commons.CnccqCommandEnum;
import cn.cfit.cnccq.commons.CnccqResultEnum;
import cn.cfit.cnccq.exceptions.CnccqJmsRuntimeException;
import cn.cfit.cnccq.jms.CnccqDestination;
import cn.cfit.cnccq.jms.CnccqJmsContext;
import cn.cfit.cnccq.jms.CnccqMessage;
import cn.cfit.cnccq.jms.CnccqResult;
import cn.cfit.cnccq.jms.CnccqTextMessage;
import cn.cfit.cnccq.logging.Log;
import cn.cfit.cnccq.logging.LogFactory;
import cn.cfit.cnccq.netty.TcpConnection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSConsumer;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.commons.pool2.impl.GenericObjectPool;

public class CnccqJmsConsumer
implements JMSConsumer {
    private final Log log = LogFactory.getLog(CnccqJmsConsumer.class);
    private final CnccqJmsContext context;
    private MessageListener messageListener;
    private final AtomicBoolean listened = new AtomicBoolean(false);
    private final boolean autoCommit;
    private final TcpConnection tcpConnection;
    private CnccqDestination destination;

    public CnccqJmsConsumer(CnccqJmsContext context, TcpConnection tcpConnection, CnccqDestination destination, boolean autoCommit) {
        this.context = context;
        this.tcpConnection = tcpConnection;
        this.autoCommit = autoCommit;
        this.destination = destination;
    }

    @Override
    public MessageListener getMessageListener() throws JMSRuntimeException {
        return this.messageListener;
    }

    public CnccqDestination createDestination(String queueName) {
        return this.context.createDestination(queueName);
    }

    @Override
    public void setMessageListener(MessageListener messageListener) throws JMSRuntimeException {
        this.messageListener = messageListener;
        this.listened.set(true);
    }

    public boolean isAutoCommit() {
        return this.autoCommit;
    }

    public void init() {
        if (!this.listened.get() || Objects.isNull(this.messageListener)) {
            this.log.info("messageListener is null or not listened");
            return;
        }
        new Thread(() -> {
            while (this.listened.get() && Objects.nonNull(this.messageListener)) {
                try {
                    CnccqTextMessage message = (CnccqTextMessage)this.receive(true);
                    if (message != null) {
                        this.messageListener.onMessage(message);
                        continue;
                    }
                    this.sleep();
                }
                catch (Exception e) {
                    this.log.error("messageListener.onMessage error ", e);
                    this.context.rollback();
                }
                finally {
                    this.context.commit();
                }
            }
        }).start();
    }

    private void sleep() {
        this.log.info("CnccqJmsConsumer  queue is empty wait 10s");
        try {
            Thread.sleep(10000L);
        }
        catch (InterruptedException e) {
            this.log.error("CnccqJmsConsumer error", e);
        }
    }

    @Override
    public CnccqMessage receive() {
        if (Objects.isNull(this.destination.getQueueName())) {
            this.log.error("#CnccqJmsConsumer# receive message, queue name is null");
        }
        CnccqMessage message = null;
        try {
            message = this.receive(this.destination, this.autoCommit);
        }
        catch (CnccqJmsRuntimeException cnccqJmsRuntimeException) {
            throw cnccqJmsRuntimeException;
        }
        catch (Exception e) {
            throw new CnccqJmsRuntimeException(e.getMessage());
        }
        return Objects.isNull(message) ? null : new CnccqTextMessage(message);
    }

    private CnccqMessage receive(boolean autoCommit) {
        if (Objects.isNull(this.destination.getQueueName())) {
            this.log.error("#CnccqJmsConsumer# receive message, queue name is null");
        }
        CnccqMessage message = null;
        try {
            message = this.receive(this.destination, autoCommit);
        }
        catch (CnccqJmsRuntimeException cnccqJmsRuntimeException) {
            throw cnccqJmsRuntimeException;
        }
        catch (Exception e) {
            throw new CnccqJmsRuntimeException(e.getMessage());
        }
        return Objects.isNull(message) ? null : new CnccqTextMessage(message);
    }

    public CnccqMessage receive(CnccqDestination destination, boolean autoCommit) {
        CnccqMessage message = null;
        try {
            message = autoCommit ? this.context.receive(destination) : this.context.receiveNotCommit(destination, this.tcpConnection);
        }
        catch (CnccqJmsRuntimeException cnccqJmsRuntimeException) {
            this.context.rollback();
            throw cnccqJmsRuntimeException;
        }
        catch (Exception e) {
            this.context.rollback();
            throw new CnccqJmsRuntimeException(e.getMessage());
        }
        return message;
    }

    public void commit() {
        if (Objects.nonNull(this.tcpConnection)) {
            CnccqResult resCommit = this.tcpConnection.sendCmd(CnccqCommandEnum.COMMIT, this.destination);
            this.log.info("#CnccqJmsConsumer# receive message commit:" + resCommit);
            if (!resCommit.isSuccess()) {
                this.log.error("#CnccqJmsConsumer# send commit fail actCode:" + resCommit.getCode());
                throw new CnccqJmsRuntimeException("CnccqJmsConsumer# send commit fail actCode:" + resCommit.getCode(), String.valueOf(resCommit.getCode()));
            }
        }
    }

    public void rollback() {
        CnccqResult resRollBack;
        if (Objects.nonNull(this.tcpConnection) && !(resRollBack = this.tcpConnection.sendCmd(CnccqCommandEnum.ROLLBACK, this.destination)).isSuccess()) {
            this.log.error("#CnccqJmsConsumer# send rollback fail actCode:" + resRollBack.getCode());
            throw new CnccqJmsRuntimeException("CnccqJmsConsumer# send rollback fail actCode:" + resRollBack.getCode(), String.valueOf(resRollBack.getCode()));
        }
    }

    public int getMessageCount(String queueName) {
        if (this.autoCommit) {
            TcpConnection connectionAuto = null;
            GenericObjectPool<TcpConnection> connectPool = this.context.getConnectPool();
            try {
                connectionAuto = connectPool.borrowObject();
            }
            catch (CnccqJmsRuntimeException cnccqJmsRuntimeException) {
                this.log.info("CnccqJmsConsumer# getMessageCount fail:" + cnccqJmsRuntimeException.getMessage() + ", ErrorCode:" + cnccqJmsRuntimeException.getErrorCode());
                throw new CnccqJmsRuntimeException("CnccqJmsConsumer# getMessageCount fail:" + cnccqJmsRuntimeException.getMessage(), cnccqJmsRuntimeException.getErrorCode());
            }
            catch (Exception e) {
                this.log.info("CnccqJmsConsumer# getMessageCount fail:" + e + ", ErrorCode:" + CnccqResultEnum.UNKNOWN_ERROR.getCode());
                throw new CnccqJmsRuntimeException("CnccqJmsConsumer# getMessageCount fail:" + e.getMessage(), String.valueOf(CnccqResultEnum.UNKNOWN_ERROR.getCode()));
            }
            CnccqDestination cnccqDestination = new CnccqDestination(queueName, this.destination.getQueueManager());
            CnccqResult<Integer> result = connectionAuto.getMessageNumber(cnccqDestination);
            if (connectionAuto != null) {
                connectPool.returnObject(connectionAuto);
            }
            if (result.isSuccess()) {
                return result.getData();
            }
            throw new CnccqJmsRuntimeException("get message num fail\uff1aack" + result.getCode(), String.valueOf(result.getCode()));
        }
        CnccqDestination cnccqDestination = new CnccqDestination(queueName, this.destination.getQueueManager());
        CnccqResult<Integer> result = this.tcpConnection.getMessageNumber(cnccqDestination);
        if (result.isSuccess()) {
            return result.getData();
        }
        throw new CnccqJmsRuntimeException("get message num fail\uff1aack" + result.getCode(), String.valueOf(result.getCode()));
    }

    @Override
    public Message receive(long l) {
        return null;
    }

    @Override
    public Message receiveNoWait() {
        return null;
    }

    @Override
    public void close() {
        this.listened.set(false);
        if (this.tcpConnection != null) {
            this.tcpConnection.release();
        }
    }

    public CnccqDestination getDestination() {
        return this.destination;
    }

    public void setDestination(CnccqDestination destination) {
        this.destination = destination;
    }

    @Override
    public String getMessageSelector() {
        return null;
    }

    @Override
    public <T> T receiveBody(Class<T> aClass) {
        return null;
    }

    @Override
    public <T> T receiveBody(Class<T> aClass, long l) {
        return null;
    }

    @Override
    public <T> T receiveBodyNoWait(Class<T> aClass) {
        return null;
    }
}

