/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.core.task;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import javax.sql.DataSource;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
import org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class InventoryTask
extends AbstractLifecycleExecutor
implements PipelineTask,
AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InventoryTask.class);
    private final String taskId;
    private final ExecuteEngine importerExecuteEngine;
    private final PipelineChannel channel;
    private final Dumper dumper;
    private final Importer importer;
    private volatile IngestPosition<?> position;

    public InventoryTask(InventoryDumperConfiguration inventoryDumperConfig, ImporterConfiguration importerConfig, PipelineChannelCreator pipelineChannelCreator, PipelineDataSourceManager dataSourceManager, DataSource sourceDataSource, PipelineTableMetaDataLoader sourceMetaDataLoader, ExecuteEngine importerExecuteEngine, PipelineJobProgressListener jobProgressListener) {
        this.importerExecuteEngine = importerExecuteEngine;
        this.taskId = this.generateTaskId(inventoryDumperConfig);
        this.channel = this.createChannel(pipelineChannelCreator);
        this.dumper = InventoryDumperCreatorFactory.getInstance(inventoryDumperConfig.getDataSourceConfig().getDatabaseType().getType()).createInventoryDumper(inventoryDumperConfig, this.channel, sourceDataSource, sourceMetaDataLoader);
        this.importer = ImporterCreatorFactory.getInstance((String)importerConfig.getDataSourceConfig().getDatabaseType().getType()).createImporter(importerConfig, dataSourceManager, this.channel, jobProgressListener);
        this.position = inventoryDumperConfig.getPosition();
    }

    private String generateTaskId(InventoryDumperConfiguration inventoryDumperConfig) {
        String result = String.format("%s.%s", inventoryDumperConfig.getDataSourceName(), inventoryDumperConfig.getActualTableName());
        return null == inventoryDumperConfig.getShardingItem() ? result : result + "#" + inventoryDumperConfig.getShardingItem();
    }

    protected void doStart() {
        Future<?> future = this.importerExecuteEngine.submit((LifecycleExecutor)this.importer, new ExecuteCallback(){

            @Override
            public void onSuccess() {
                log.info("importer onSuccess, taskId={}", (Object)InventoryTask.this.taskId);
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("importer onFailure, taskId={}", (Object)InventoryTask.this.taskId, (Object)throwable);
                InventoryTask.this.stop();
            }
        });
        this.dumper.start();
        this.waitForResult(future);
        log.info("importer future done");
    }

    private PipelineChannel createChannel(PipelineChannelCreator pipelineChannelCreator) {
        return pipelineChannelCreator.createPipelineChannel(1, records -> {
            Record lastNormalRecord = this.getLastNormalRecord(records);
            if (null != lastNormalRecord) {
                this.position = lastNormalRecord.getPosition();
            }
        });
    }

    private Record getLastNormalRecord(List<Record> records) {
        for (int index = records.size() - 1; index >= 0; --index) {
            Record record = records.get(index);
            if (record.getPosition() instanceof PlaceholderPosition) continue;
            return record;
        }
        return null;
    }

    private void waitForResult(Future<?> future) {
        try {
            future.get();
        }
        catch (InterruptedException interruptedException) {
        }
        catch (ExecutionException ex) {
            throw new PipelineJobExecutionException(String.format("Task %s execute failed ", this.taskId), ex.getCause());
        }
    }

    protected void doStop() {
        this.dumper.stop();
        this.importer.stop();
    }

    public InventoryTaskProgress getTaskProgress() {
        return new InventoryTaskProgress(this.position);
    }

    @Override
    public void close() {
        this.channel.close();
    }

    @Generated
    public String toString() {
        return "InventoryTask(taskId=" + this.getTaskId() + ", position=" + this.position + ")";
    }

    @Override
    @Generated
    public String getTaskId() {
        return this.taskId;
    }
}

