package cloudflow.core.spark;

import cloudflow.core.Operations;
import cloudflow.core.Pipeline;
import cloudflow.core.PipelineRunner;
import cloudflow.core.hadoop.HadoopRecordFileLoader;
import cloudflow.core.hadoop.MapReduceRunner;
import cloudflow.core.hadoop.records.IWritableRecord;
import cloudflow.core.operations.Summarizer;
import cloudflow.core.operations.Transformer;
import cloudflow.core.records.IntegerRecord;
import cloudflow.core.records.Record;
import cloudflow.core.records.RecordList;
import java.io.IOException;
import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;

/* loaded from: input_file:cloudflow/core/spark/SparkRunner.class */
public class SparkRunner extends PipelineRunner implements Serializable {
    private Broadcast<Pipeline> pipelineBroadcast;
    PairFlatMapFunction PSEUDO_MAPPER = new PairFlatMapFunction<Tuple2<WritableComparable, Writable>, String, Object>() { // from class: cloudflow.core.spark.SparkRunner.1
        public Iterable<Tuple2<String, Object>> call(Tuple2<WritableComparable, Writable> tuple2) throws Exception {
            Pipeline pipeline = (Pipeline) SparkRunner.this.pipelineBroadcast.getValue();
            IWritableRecord createWritableRecord = MapReduceRunner.createWritableRecord(pipeline.getLoader().getRecordClass());
            RecordList recordList = new RecordList();
            RecordToListWriter recordToListWriter = new RecordToListWriter();
            try {
                pipeline.getMapOperations().createInstances(recordList, recordToListWriter);
            } catch (IllegalAccessException | InstantiationException e) {
                e.printStackTrace();
            }
            recordList.add(createWritableRecord.fillRecord((WritableComparable) tuple2._1(), (Writable) tuple2._2()));
            return recordToListWriter.getMemory();
        }
    };
    Function<Tuple2<String, Iterable<Object>>, Object> PSEUDO_REDUCER = new Function<Tuple2<String, Iterable<Object>>, Object>() { // from class: cloudflow.core.spark.SparkRunner.2
        public Object call(Tuple2<String, Iterable<Object>> tuple2) throws Exception {
            String str = (String) tuple2._1();
            Pipeline pipeline = (Pipeline) SparkRunner.this.pipelineBroadcast.getValue();
            SparkGroupedRecords sparkGroupedRecords = new SparkGroupedRecords(pipeline.getMapperOutputRecordClass());
            Summarizer<Record<?, ?>, Record<?, ?>> summarizer = pipeline.getReduceOperations().createInstances().get(0);
            Operations<Transformer<Record<?, ?>, Record<?, ?>>> afterReduceOperations = pipeline.getAfterReduceOperations();
            RecordToListWriter2 recordToListWriter2 = new RecordToListWriter2();
            afterReduceOperations.createInstances(summarizer.getOutputRecords(), recordToListWriter2);
            sparkGroupedRecords.setKey(str);
            sparkGroupedRecords.setValues(((Iterable) tuple2._2()).iterator());
            summarizer.summarize(str.toString(), sparkGroupedRecords);
            return recordToListWriter2.toString();
        }
    };
    Function PSEUDO_REDUCER_WITHOUT_REDUCER = new Function<Tuple2<String, Integer>, Object>() { // from class: cloudflow.core.spark.SparkRunner.3
        IntegerRecord record = new IntegerRecord();

        public Object call(Tuple2<String, Integer> tuple2) throws Exception {
            Pipeline pipeline = (Pipeline) SparkRunner.this.pipelineBroadcast.getValue();
            RecordToListWriter2 recordToListWriter2 = new RecordToListWriter2();
            RecordList recordList = new RecordList();
            try {
                pipeline.getAfterReduceOperations().createInstances(recordList, recordToListWriter2);
            } catch (IllegalAccessException | InstantiationException e) {
                e.printStackTrace();
            }
            this.record.setKey((String) tuple2._1());
            this.record.setValue((Integer) tuple2._2());
            recordList.add(this.record);
            return recordToListWriter2.toString();
        }
    };
    private String master;

    public SparkRunner(String str) {
        this.master = "";
        this.master = str;
    }

    @Override // cloudflow.core.PipelineRunner
    public boolean run(Pipeline pipeline) throws IOException {
        JavaRDD map;
        pipeline.check();
        if (!(pipeline.getLoader() instanceof HadoopRecordFileLoader)) {
            System.out.println("Input loader doesn't support local files.");
            return false;
        }
        SparkConf appName = new SparkConf().setAppName(pipeline.getName());
        appName.set("spark.executor.memory", "4g");
        JavaSparkContext javaSparkContext = new JavaSparkContext(appName);
        HadoopRecordFileLoader hadoopRecordFileLoader = (HadoopRecordFileLoader) pipeline.getLoader();
        Configuration configuration = new Configuration();
        this.pipelineBroadcast = javaSparkContext.broadcast(pipeline);
        JavaPairRDD newAPIHadoopFile = javaSparkContext.newAPIHadoopFile(pipeline.getInput(), hadoopRecordFileLoader.getInputFormat(), hadoopRecordFileLoader.getInputKeyClass(), hadoopRecordFileLoader.getInputValueClass(), configuration);
        if (pipeline.hasCountOperation()) {
            System.out.println("Count operation detected. switching to sparks internal version.");
            map = newAPIHadoopFile.flatMapToPair(this.PSEUDO_MAPPER).reduceByKey(new Function2<Integer, Integer, Integer>() { // from class: cloudflow.core.spark.SparkRunner.4
                private static final long serialVersionUID = 1;

                public Integer call(Integer num, Integer num2) {
                    return Integer.valueOf(num.intValue() + num2.intValue());
                }
            }).map(this.PSEUDO_REDUCER_WITHOUT_REDUCER);
        } else {
            map = newAPIHadoopFile.flatMapToPair(this.PSEUDO_MAPPER).groupByKey(200).map(this.PSEUDO_REDUCER);
        }
        map.saveAsTextFile(pipeline.getOutput());
        return true;
    }
}
