/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.execute;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PipelineJobExecutor
extends AbstractLifecycleExecutor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PipelineJobExecutor.class);
    private final ExecutorService executor = Executors.newFixedThreadPool(20);

    protected void doStart() {
        PipelineAPIFactory.getGovernanceRepositoryAPI().watch("/pipeline", event -> {
            if (PipelineMetaDataNode.BARRIER_PATTERN.matcher(event.getKey()).matches() && event.getType() == DataChangedEvent.Type.ADDED) {
                PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
            }
            this.getJobConfigPOJO(event).ifPresent(optional -> this.processEvent(event, (JobConfigurationPOJO)optional));
        });
    }

    private Optional<JobConfigurationPOJO> getJobConfigPOJO(DataChangedEvent event) {
        try {
            if (PipelineMetaDataNode.CONFIG_PATTERN.matcher(event.getKey()).matches()) {
                log.info("{} job config: {}", (Object)event.getType(), (Object)event.getKey());
                return Optional.of(YamlEngine.unmarshal((String)event.getValue(), JobConfigurationPOJO.class, (boolean)true));
            }
        }
        catch (Exception ex) {
            log.error("analyze job config pojo failed.", (Throwable)ex);
        }
        return Optional.empty();
    }

    private void processEvent(DataChangedEvent event, JobConfigurationPOJO jobConfigPOJO) {
        boolean isDeleted = DataChangedEvent.Type.DELETED == event.getType();
        boolean isDisabled = jobConfigPOJO.isDisabled();
        if (isDeleted || isDisabled) {
            String jobId = jobConfigPOJO.getJobName();
            log.info("jobId={}, deleted={}, disabled={}", new Object[]{jobId, isDeleted, isDisabled});
            MigrationJobConfiguration jobConfig = YamlMigrationJobConfigurationSwapper.swapToObject((String)jobConfigPOJO.getJobParameter());
            if (isDeleted) {
                new MigrationJobPreparer().cleanup(jobConfig);
            } else if (PipelineJobProgressDetector.isJobSuccessful(jobConfig.getJobShardingCount(), MigrationJobAPIFactory.getInstance().getJobProgress(jobConfig).values())) {
                log.info("isJobSuccessful=true");
                new MigrationJobPreparer().cleanup(jobConfig);
            }
            PipelineJobCenter.stop(jobId);
            return;
        }
        switch (event.getType()) {
            case ADDED: 
            case UPDATED: {
                if (PipelineJobCenter.isJobExisting(jobConfigPOJO.getJobName())) {
                    log.info("{} added to executing jobs failed since it already exists", (Object)jobConfigPOJO.getJobName());
                    break;
                }
                log.info("{} executing jobs", (Object)jobConfigPOJO.getJobName());
                this.executor.execute(() -> this.execute(jobConfigPOJO));
                break;
            }
        }
    }

    private void execute(JobConfigurationPOJO jobConfigPOJO) {
        MigrationJob job = new MigrationJob();
        PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
        OneOffJobBootstrap oneOffJobBootstrap = new OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), (ElasticJob)job, jobConfigPOJO.toJobConfiguration());
        oneOffJobBootstrap.execute();
        job.setOneOffJobBootstrap(oneOffJobBootstrap);
    }

    protected void doStop() {
        this.executor.shutdown();
        this.executor.shutdownNow();
    }
}

