/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.Collection;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
import org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InventoryIncrementalTasksRunner
implements PipelineTasksRunner {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InventoryIncrementalTasksRunner.class);
    private final PipelineJobItemAPI jobItemAPI = new InventoryIncrementalJobItemAPIImpl();
    private final PipelineJobItemContext jobItemContext;
    private final Collection<InventoryTask> inventoryTasks;
    private final Collection<IncrementalTask> incrementalTasks;
    private final ExecuteEngine inventoryDumperExecuteEngine;
    private final ExecuteEngine incrementalDumperExecuteEngine;

    public void stop() {
        this.jobItemContext.setStopping(true);
        log.info("stop, jobId={}, shardingItem={}", (Object)this.jobItemContext.getJobId(), (Object)this.jobItemContext.getShardingItem());
        for (InventoryTask inventoryTask : this.inventoryTasks) {
            log.info("stop inventory task {} - {}", (Object)this.jobItemContext.getJobId(), (Object)inventoryTask.getTaskId());
            inventoryTask.stop();
            inventoryTask.close();
        }
        for (IncrementalTask incrementalTask : this.incrementalTasks) {
            log.info("stop incremental task {} - {}", (Object)this.jobItemContext.getJobId(), (Object)incrementalTask.getTaskId());
            incrementalTask.stop();
            incrementalTask.close();
        }
    }

    public void start() {
        if (this.jobItemContext.isStopping()) {
            log.info("job stopping, ignore inventory task");
            return;
        }
        PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(this.jobItemContext.getJobId())).persistJobItemProgress(this.jobItemContext);
        if (this.executeInventoryTask()) {
            if (this.jobItemContext.isStopping()) {
                log.info("stopping, ignore incremental task");
                return;
            }
            this.executeIncrementalTask();
        }
    }

    private synchronized boolean executeInventoryTask() {
        if (PipelineJobProgressDetector.allInventoryTasksFinished(this.inventoryTasks)) {
            log.info("All inventory tasks finished.");
            return true;
        }
        log.info("-------------- Start inventory task --------------");
        this.updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
        ExecuteCallback inventoryTaskCallback = this.createInventoryTaskCallback();
        for (InventoryTask each : this.inventoryTasks) {
            if (each.getTaskProgress().getPosition() instanceof FinishedPosition) continue;
            this.inventoryDumperExecuteEngine.submit(each, inventoryTaskCallback);
        }
        return false;
    }

    private void updateLocalAndRemoteJobItemStatus(JobStatus jobStatus) {
        this.jobItemContext.setStatus(jobStatus);
        this.jobItemAPI.updateJobItemStatus(this.jobItemContext.getJobId(), this.jobItemContext.getShardingItem(), jobStatus);
    }

    private ExecuteCallback createInventoryTaskCallback() {
        return new ExecuteCallback(){

            @Override
            public void onSuccess() {
                if (PipelineJobProgressDetector.allInventoryTasksFinished(InventoryIncrementalTasksRunner.this.inventoryTasks)) {
                    log.info("onSuccess, all inventory tasks finished.");
                    InventoryIncrementalTasksRunner.this.executeIncrementalTask();
                }
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("Inventory task execute failed.", throwable);
                InventoryIncrementalTasksRunner.this.updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
                InventoryIncrementalTasksRunner.this.stop();
            }
        };
    }

    private synchronized void executeIncrementalTask() {
        if (this.incrementalTasks.isEmpty()) {
            log.info("incrementalTasks empty, ignore");
            return;
        }
        if (JobStatus.EXECUTE_INCREMENTAL_TASK == this.jobItemContext.getStatus()) {
            log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
            return;
        }
        log.info("-------------- Start incremental task --------------");
        this.updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
        ExecuteCallback incrementalTaskCallback = this.createIncrementalTaskCallback();
        for (IncrementalTask each : this.incrementalTasks) {
            if (each.getTaskProgress().getPosition() instanceof FinishedPosition) continue;
            this.incrementalDumperExecuteEngine.submit(each, incrementalTaskCallback);
        }
    }

    private ExecuteCallback createIncrementalTaskCallback() {
        return new ExecuteCallback(){

            @Override
            public void onSuccess() {
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("Incremental task execute failed.", throwable);
                InventoryIncrementalTasksRunner.this.updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
                InventoryIncrementalTasksRunner.this.stop();
            }
        };
    }

    @Generated
    public InventoryIncrementalTasksRunner(PipelineJobItemContext jobItemContext, Collection<InventoryTask> inventoryTasks, Collection<IncrementalTask> incrementalTasks, ExecuteEngine inventoryDumperExecuteEngine, ExecuteEngine incrementalDumperExecuteEngine) {
        this.jobItemContext = jobItemContext;
        this.inventoryTasks = inventoryTasks;
        this.incrementalTasks = incrementalTasks;
        this.inventoryDumperExecuteEngine = inventoryDumperExecuteEngine;
        this.incrementalDumperExecuteEngine = incrementalDumperExecuteEngine;
    }

    @Generated
    public PipelineJobItemContext getJobItemContext() {
        return this.jobItemContext;
    }
}

