package cn.com.yusys.yusp.mq.guard;

import cn.com.yusys.yusp.commons.util.JsonUtils;
import cn.com.yusys.yusp.commons.util.collection.CollectionUtils;
import cn.com.yusys.yusp.mq.guard.consts.MQRecordConst;
import cn.com.yusys.yusp.mq.guard.entity.MqRecord;
import cn.com.yusys.yusp.mq.guard.service.IMqRecordService;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.xxl.job.core.handler.annotation.XxlJob;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.sleuth.SpanNamer;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.instrument.async.TraceRunnable;
import org.springframework.cloud.sleuth.internal.DefaultSpanNamer;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/com/yusys/yusp/mq/guard/MQRecordSomeJob.class */
public class MQRecordSomeJob {
    private static final Logger log = LoggerFactory.getLogger(MQRecordSomeJob.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    IMqRecordService mqRecordService;

    @Autowired
    private Tracer tracer;
    private SpanNamer spanNamer = new DefaultSpanNamer();

    @XxlJob("MQRecordSomeJob.mqRecordHandle")
    public void mqRecordHandle() {
        new TraceRunnable(this.tracer, this.spanNamer, this::doMqRecordHandle).run();
    }

    private void doMqRecordHandle() {
        log.info("开始进行MQ消息记录的重新投递");
        List<MqRecord> queryNeedReDeliveryRecord = this.mqRecordService.queryNeedReDeliveryRecord();
        if (CollectionUtils.isEmpty(queryNeedReDeliveryRecord)) {
            log.info("没有需要处理的记录");
            return;
        }
        for (MqRecord mqRecord : queryNeedReDeliveryRecord) {
            String recordId = mqRecord.getRecordId();
            String mqExchange = mqRecord.getMqExchange();
            String routingKey = mqRecord.getRoutingKey();
            String queue = mqRecord.getQueue();
            String msgProperties = mqRecord.getMsgProperties();
            String msgBody = mqRecord.getMsgBody();
            MqRecord mqRecord2 = new MqRecord();
            mqRecord2.setRecordId(recordId);
            mqRecord2.setState(MQRecordConst.State.REDELIVERY);
            if (this.mqRecordService.updateMqRecordExpHandle(mqRecord2)) {
                try {
                    MQRecordGuardContext.set("recordId", recordId);
                    MessageProperties messageProperties = (MessageProperties) JsonUtils.parse(msgProperties, MessageProperties.class);
                    Message message = null != messageProperties ? new Message(msgBody.getBytes(StandardCharsets.UTF_8), messageProperties) : new Message(msgBody.getBytes(StandardCharsets.UTF_8));
                    if (StringUtils.isNotEmpty(routingKey)) {
                        this.rabbitTemplate.convertAndSend(mqExchange, routingKey, message);
                    } else if (StringUtils.isNotEmpty(queue)) {
                        this.rabbitTemplate.convertAndSend(mqExchange, queue, message);
                    } else {
                        log.info("无效的记录数据(无法重新投递) {}", mqRecord.getRecordId());
                    }
                } catch (Exception e) {
                    log.error("消息 [{}] 重发异常", recordId, e);
                }
            } else {
                log.info("当前记录已经在重新处理中 {}", recordId);
            }
        }
        MQRecordGuardContext.clear();
        log.info("重新投递处理完成");
    }

    @XxlJob("MQRecordSomeJob.mqRecordClean")
    public void mqRecordClean() {
        log.info("开始清理消息队列记录表中的数据");
        this.mqRecordService.removeMqRecord();
        log.info("清理完成");
    }
}
