/*
 * Decompiled with CFR 0.152.
 */
package cn.com.yusys.udp.cloud.message.memory;

import cn.com.yusys.udp.cloud.message.memory.MemoryExecutor;
import cn.com.yusys.udp.cloud.message.memory.MemoryProvisioningProvider;
import cn.com.yusys.udp.cloud.message.persistent.ConfirmChannel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.core.MessageProducer;
import org.springframework.lang.NonNull;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;

public class MemoryBinder
extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, MemoryProvisioningProvider> {
    private final Logger log = LoggerFactory.getLogger(MemoryBinder.class);
    private final Map<String, SubscribableChannel> registry = new ConcurrentHashMap<String, SubscribableChannel>();
    private final ConfirmChannel confirmChannel;
    private final MemoryExecutor executor;
    private final Object lock = new Object();

    public MemoryBinder(String[] headersToEmbed, MemoryProvisioningProvider provisioningProvider, @NonNull ConfirmChannel confirmChannel, MemoryExecutor executor) {
        super(headersToEmbed, (ProvisioningProvider)provisioningProvider);
        this.confirmChannel = confirmChannel;
        this.executor = executor;
    }

    protected String errorsBaseName(ConsumerDestination destination, String group, ConsumerProperties consumerProperties) {
        return super.errorsBaseName(destination, group, consumerProperties) + ".memory";
    }

    protected String errorsBaseName(ProducerDestination destination) {
        return super.errorsBaseName(destination) + ".memory";
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ProducerProperties producerProperties, MessageChannel errorChannel) throws Exception {
        SubscribableChannel channel = this.registrySubscribableChannel(destination.getName(), (ApplicationContext)this.getApplicationContext());
        return message -> {
            this.confirmChannel.send(message);
            channel.send(message);
        };
    }

    protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, String group, ConsumerProperties properties) throws Exception {
        return new MessageProducer(){
            private MessageChannel channel;

            public MessageChannel getOutputChannel() {
                return this.channel;
            }

            public void setOutputChannel(MessageChannel outputChannel) {
                this.channel = outputChannel;
                MemoryBinder.this.log.debug("Register output Channel");
                SubscribableChannel subscribableChannel = MemoryBinder.this.registrySubscribableChannel(destination.getName(), (ApplicationContext)MemoryBinder.this.getApplicationContext());
                subscribableChannel.subscribe(message -> {
                    MemoryBinder.this.log.debug("Memory Channel Send Message");
                    this.getOutputChannel().send(message);
                });
            }
        };
    }

    void registryChannel(String name, SubscribableChannel channel) {
        this.registry.putIfAbsent(name, channel);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SubscribableChannel registrySubscribableChannel(String name, ApplicationContext applicationContext) {
        SubscribableChannel channel = this.registry.get(name);
        if (channel == null) {
            Object object = this.lock;
            synchronized (object) {
                ExecutorSubscribableChannel subscribableChannel = new ExecutorSubscribableChannel((Executor)this.executor);
                subscribableChannel.setBeanName(name);
                this.registryChannel(name, (SubscribableChannel)subscribableChannel);
                return subscribableChannel;
            }
        }
        return channel;
    }
}

