package cloudflow.core.hadoop;

import cloudflow.bio.bam.BamRecord;
import cloudflow.bio.bam.BamWritableRecord;
import cloudflow.core.Pipeline;
import cloudflow.core.PipelineRunner;
import cloudflow.core.hadoop.records.FloatWritableRecord;
import cloudflow.core.hadoop.records.IWritableRecord;
import cloudflow.core.hadoop.records.IntegerWritableRecord;
import cloudflow.core.hadoop.records.TextWritableRecord;
import cloudflow.core.records.FloatRecord;
import cloudflow.core.records.IntegerRecord;
import cloudflow.core.records.Record;
import cloudflow.core.records.TextRecord;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

/* loaded from: input_file:cloudflow/core/hadoop/MapReduceRunner.class */
public class MapReduceRunner extends PipelineRunner {
    private static Map<Class, Class> writableRecords = new HashMap();

    static {
        registerWritableRecord(IntegerRecord.class, IntegerWritableRecord.class);
        registerWritableRecord(FloatRecord.class, FloatWritableRecord.class);
        registerWritableRecord(TextRecord.class, TextWritableRecord.class);
        registerWritableRecord(BamRecord.class, BamWritableRecord.class);
    }

    public static void registerWritableRecord(Class<? extends Record<?, ?>> cls, Class<? extends IWritableRecord> cls2) {
        System.out.println("Record: " + cls.getName());
        writableRecords.put(cls, cls2);
    }

    public static IWritableRecord createWritableRecord(Class<? extends Record<?, ?>> cls) {
        Class cls2 = writableRecords.get(cls);
        if (cls2 != null) {
            try {
                return (IWritableRecord) cls2.newInstance();
            } catch (IllegalAccessException | InstantiationException e) {
                e.printStackTrace();
            }
        }
        throw new NotImplementedException("no support for hadoop for record " + cls.getName() + " implemented!");
    }

    @Override // cloudflow.core.PipelineRunner
    public boolean run(Pipeline pipeline) throws IOException {
        if (!pipeline.check()) {
            return false;
        }
        GenericJob genericJob = new GenericJob(pipeline.getName());
        genericJob.setInput(new String[]{pipeline.getInput()});
        genericJob.setOutput(pipeline.getOutput());
        genericJob.setDriverClass(pipeline.getDriverClass());
        if (!(pipeline.getLoader() instanceof HadoopRecordFileLoader)) {
            System.out.println("Input loader doesn't support MapReduce Hadoop.");
            return false;
        }
        HadoopRecordFileLoader hadoopRecordFileLoader = (HadoopRecordFileLoader) pipeline.getLoader();
        genericJob.setInputFormat(hadoopRecordFileLoader.getInputFormat());
        genericJob.setMapOperations(pipeline.getMapOperations());
        genericJob.setAfterReduceOperations(pipeline.getAfterReduceOperations());
        genericJob.setMapperInputRecords(pipeline.getLoader().getRecordClass());
        genericJob.setCombinerOperations(pipeline.getCombinerOperations());
        hadoopRecordFileLoader.configure(genericJob.getConfiguration());
        genericJob.getConfiguration().set("cloudflow.loader", hadoopRecordFileLoader.getClass().getName());
        pipeline.getConf().writeToConfiguration(genericJob.getConfiguration());
        genericJob.setMapperOutputRecords(pipeline.getMapperOutputRecordClass());
        try {
            Record record = (Record) pipeline.getMapperOutputRecordClass().newInstance();
            IWritableRecord createWritableRecord = createWritableRecord(pipeline.getMapperOutputRecordClass());
            WritableComparable fillWritableKey = createWritableRecord.fillWritableKey(record);
            Writable fillWritableValue = createWritableRecord.fillWritableValue(record);
            genericJob.setMapperOutputRecordsKey(fillWritableKey.getClass());
            genericJob.setMapperOutputRecordsValue(fillWritableValue.getClass());
            System.out.println("Mapper output records: " + pipeline.getMapperOutputRecordClass().getName() + "  (" + fillWritableKey.getClass().getName() + ", " + fillWritableValue.getClass().getName() + ")");
        } catch (IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
            System.exit(1);
        }
        genericJob.setReduceOperations(pipeline.getReduceOperations());
        return genericJob.execute();
    }
}
