package org.objectweb.proactive.extensions.calcium;

import java.io.File;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.objectweb.proactive.annotation.PublicAPI;
import org.objectweb.proactive.core.util.log.Loggers;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.calcium.environment.FileServerClient;
import org.objectweb.proactive.extensions.calcium.exceptions.PanicException;
import org.objectweb.proactive.extensions.calcium.futures.CalFuture;
import org.objectweb.proactive.extensions.calcium.futures.CalFutureImpl;
import org.objectweb.proactive.extensions.calcium.skeletons.InstructionBuilderVisitor;
import org.objectweb.proactive.extensions.calcium.skeletons.Skeleton;
import org.objectweb.proactive.extensions.calcium.system.SkeletonSystemImpl;
import org.objectweb.proactive.extensions.calcium.system.files.FileStaging;
import org.objectweb.proactive.extensions.calcium.task.Task;
import org.objectweb.proactive.extensions.calcium.task.TaskPriority;

@PublicAPI
/* loaded from: input_file:org/objectweb/proactive/extensions/calcium/Stream.class */
public class Stream<T extends Serializable, R extends Serializable> {
    private Facade facade;
    private Skeleton<T, R> skeleton;
    private FileServerClient fserver;
    static Logger logger = ProActiveLogger.getLogger(Loggers.SKELETONS_KERNEL);
    private static File DEFAULT_OUTPUT_ROOT_DIR = SkeletonSystemImpl.newDirInTmp("calcium-output");
    private int streamPriority = TaskPriority.DEFAULT_PRIORITY;
    BlockingQueue<CalFuture<R>> list = new LinkedBlockingQueue();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Stream(Facade facade, FileServerClient fileServerClient, Skeleton<T, R> skeleton) {
        this.skeleton = skeleton;
        this.facade = facade;
        this.fserver = fileServerClient;
    }

    public CalFuture<R> input(T t) throws PanicException {
        return input((Stream<T, R>) t, DEFAULT_OUTPUT_ROOT_DIR);
    }

    public CalFuture<R> input(T t, File file) throws PanicException {
        Task<?> stageInput = FileStaging.stageInput(this.fserver, new Task(t));
        InstructionBuilderVisitor instructionBuilderVisitor = new InstructionBuilderVisitor();
        this.skeleton.accept(instructionBuilderVisitor);
        stageInput.setStack(instructionBuilderVisitor.stack);
        stageInput.setPriority(new TaskPriority(this.streamPriority));
        CalFutureImpl<?> calFutureImpl = new CalFutureImpl<>(stageInput.taskId, this.fserver, file);
        this.facade.putTask(stageInput, calFutureImpl);
        return calFutureImpl;
    }

    public List<CalFuture<R>> input(List<T> list) throws InterruptedException, PanicException {
        return input(list, DEFAULT_OUTPUT_ROOT_DIR);
    }

    public List<CalFuture<R>> input(List<T> list, File file) throws InterruptedException, PanicException {
        Vector vector = new Vector(list.size());
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            vector.add(input((Stream<T, R>) it.next(), file));
        }
        return vector;
    }

    public void input(T t, BlockingQueue<CalFuture<R>> blockingQueue, File file) throws PanicException {
        ((CalFutureImpl) input((Stream<T, R>) t, file)).setCallBackQueue(blockingQueue);
    }

    public void input(T t, BlockingQueue<CalFuture<R>> blockingQueue) throws PanicException {
        input(t, blockingQueue, DEFAULT_OUTPUT_ROOT_DIR);
    }

    public void submit(T t, File file) throws PanicException {
        ((CalFutureImpl) input((Stream<T, R>) t, file)).setCallBackQueue(this.list);
    }

    public void submit(T t) throws PanicException {
        submit(t, DEFAULT_OUTPUT_ROOT_DIR);
    }

    public CalFuture<R> retrieve(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.list.poll(j, timeUnit);
    }

    public CalFuture<R> retrieve() throws InterruptedException {
        return this.list.take();
    }
}
