package org.ow2.proactive.scheduler.ext.masterworker;

import java.io.IOException;
import java.io.Serializable;
import java.security.KeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.security.auth.login.LoginException;
import org.apache.xalan.templates.Constants;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.Service;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.ProActiveRuntimeException;
import org.objectweb.proactive.core.util.wrapper.BooleanWrapper;
import org.objectweb.proactive.extensions.masterworker.TaskException;
import org.objectweb.proactive.extensions.masterworker.core.AOWorker;
import org.objectweb.proactive.extensions.masterworker.core.ResultInternImpl;
import org.objectweb.proactive.extensions.masterworker.interfaces.internal.TaskIntern;
import org.objectweb.proactive.extensions.masterworker.interfaces.internal.WorkerMaster;
import org.ow2.proactive.authentication.crypto.CredData;
import org.ow2.proactive.authentication.crypto.Credentials;
import org.ow2.proactive.scheduler.common.NotificationData;
import org.ow2.proactive.scheduler.common.Scheduler;
import org.ow2.proactive.scheduler.common.SchedulerAuthenticationInterface;
import org.ow2.proactive.scheduler.common.SchedulerConnection;
import org.ow2.proactive.scheduler.common.SchedulerEvent;
import org.ow2.proactive.scheduler.common.SchedulerEventListener;
import org.ow2.proactive.scheduler.common.exception.InternalSchedulerException;
import org.ow2.proactive.scheduler.common.exception.SchedulerException;
import org.ow2.proactive.scheduler.common.exception.UserException;
import org.ow2.proactive.scheduler.common.job.JobEnvironment;
import org.ow2.proactive.scheduler.common.job.JobId;
import org.ow2.proactive.scheduler.common.job.JobInfo;
import org.ow2.proactive.scheduler.common.job.JobPriority;
import org.ow2.proactive.scheduler.common.job.JobResult;
import org.ow2.proactive.scheduler.common.job.JobState;
import org.ow2.proactive.scheduler.common.job.JobStatus;
import org.ow2.proactive.scheduler.common.job.TaskFlowJob;
import org.ow2.proactive.scheduler.common.job.UserIdentification;
import org.ow2.proactive.scheduler.common.task.JavaTask;
import org.ow2.proactive.scheduler.common.task.TaskInfo;
import org.ow2.proactive.scheduler.common.task.TaskResult;

/* loaded from: input_file:org/ow2/proactive/scheduler/ext/masterworker/AOSchedulerWorker.class */
public class AOSchedulerWorker extends AOWorker implements SchedulerEventListener {
    private static final long serialVersionUID = 31;
    private Scheduler scheduler;
    private HashMap<JobId, Collection<TaskIntern<Serializable>>> processing;
    private String schedulerUrl;
    private String user;
    private String password;
    private String[] jobclasspath;

    public AOSchedulerWorker() {
    }

    public AOSchedulerWorker(String str, WorkerMaster workerMaster, Map<String, Serializable> map, String str2, String str3, String str4, String[] strArr) throws SchedulerException, LoginException {
        super(str, workerMaster, map);
        this.schedulerUrl = str2;
        this.user = str3;
        this.password = str4;
        this.jobclasspath = strArr;
    }

    @Override // org.objectweb.proactive.extensions.masterworker.core.AOWorker, org.objectweb.proactive.InitActive
    public void initActivity(Body body) {
        this.stubOnThis = (AOSchedulerWorker) PAActiveObject.getStubOnThis();
        try {
            SchedulerAuthenticationInterface join = SchedulerConnection.join(this.schedulerUrl);
            try {
                this.scheduler = join.login(Credentials.createCredentials(new CredData(CredData.parseLogin(this.user), CredData.parseDomain(this.user), this.password), join.getPublicKey()));
                this.processing = new HashMap<>();
                try {
                    this.scheduler.addEventListener((AOSchedulerWorker) this.stubOnThis, false, SchedulerEvent.JOB_RUNNING_TO_FINISHED, SchedulerEvent.JOB_PENDING_TO_FINISHED, SchedulerEvent.KILLED, SchedulerEvent.SHUTDOWN, SchedulerEvent.SHUTTING_DOWN);
                } catch (SchedulerException e) {
                    e.printStackTrace();
                }
                PAActiveObject.setImmediateService("heartBeat");
                PAActiveObject.setImmediateService(Constants.ATTRNAME_TERMINATE);
                this.stubOnThis.getTaskAndSchedule();
            } catch (KeyException e2) {
                throw new LoginException("" + e2);
            } catch (LoginException e3) {
                throw new LoginException("Could not retrieve public key, contact the Scheduler admininistrator" + e3);
            }
        } catch (Exception e4) {
            throw new ProActiveRuntimeException(e4);
        }
    }

    @Override // org.objectweb.proactive.extensions.masterworker.core.AOWorker, org.objectweb.proactive.extensions.masterworker.interfaces.internal.Worker
    public void clear() {
        Iterator<JobId> it = this.processing.keySet().iterator();
        while (it.hasNext()) {
            try {
                this.scheduler.killJob(it.next());
            } catch (SchedulerException e) {
                logger.error(e.getMessage());
            }
        }
        this.processing.clear();
        new Service(PAActiveObject.getBodyOnThis()).flushAll();
        this.provider.isCleared(this.stubOnThis);
    }

    @Override // org.objectweb.proactive.extensions.masterworker.core.AOWorker
    public void scheduleTask() {
        if (debug) {
            logger.debug(this.name + " schedules tasks...");
        }
        while (this.pendingTasksFutures.size() > 0) {
            this.pendingTasks.addAll(this.pendingTasksFutures.remove());
        }
        if (!this.suspended && this.pendingTasks.size() > 0) {
            TaskFlowJob taskFlowJob = new TaskFlowJob();
            taskFlowJob.setName("Master-Worker Framework Job " + this.pendingTasks.peek().getId());
            taskFlowJob.setPriority(JobPriority.NORMAL);
            taskFlowJob.setCancelJobOnError(false);
            taskFlowJob.setDescription("Set of parallel master-worker tasks");
            JobEnvironment jobEnvironment = new JobEnvironment();
            try {
                jobEnvironment.setJobClasspath(this.jobclasspath);
                taskFlowJob.setEnvironment(jobEnvironment);
                ArrayList arrayList = new ArrayList();
                while (this.pendingTasks.size() > 0) {
                    TaskIntern<Serializable> remove = this.pendingTasks.remove();
                    arrayList.add(remove);
                    JavaTask javaTask = new JavaTask();
                    javaTask.setName("MasterWorker" + remove.getId());
                    javaTask.setPreciousResult(true);
                    javaTask.setExecutableClassName(SchedulerExecutableAdapter.class.getName());
                    javaTask.addArgument("taskCode", remove);
                    javaTask.addArgument("workerMem", (Serializable) this.initialMemory);
                    try {
                        taskFlowJob.addTask(javaTask);
                    } catch (UserException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    this.processing.put(this.scheduler.submit(taskFlowJob), arrayList);
                } catch (SchedulerException e2) {
                    e2.printStackTrace();
                }
            } catch (IOException e3) {
                e3.printStackTrace();
                return;
            }
        } else if (debug) {
            logger.debug(this.name + " sleeps...");
        }
        this.wakingup = false;
    }

    @Override // org.objectweb.proactive.extensions.masterworker.core.AOWorker, org.objectweb.proactive.extensions.masterworker.interfaces.internal.Worker
    public BooleanWrapper terminate() {
        try {
            this.scheduler.disconnect();
        } catch (SchedulerException e) {
        }
        return super.terminate();
    }

    @Override // org.ow2.proactive.scheduler.common.SchedulerEventListener
    public void schedulerStateUpdatedEvent(SchedulerEvent schedulerEvent) {
        switch (schedulerEvent) {
            case KILLED:
            case SHUTTING_DOWN:
                Iterator<JobId> it = this.processing.keySet().iterator();
                while (it.hasNext()) {
                    jobDidNotSucceed(it.next(), new TaskException(new InternalSchedulerException("Scheduler was " + schedulerEvent)));
                }
                return;
            default:
                return;
        }
    }

    @Override // org.ow2.proactive.scheduler.common.SchedulerEventListener
    public void jobStateUpdatedEvent(NotificationData<JobInfo> notificationData) {
        switch (notificationData.getEventType()) {
            case JOB_PENDING_TO_FINISHED:
            case JOB_RUNNING_TO_FINISHED:
                JobInfo data = notificationData.getData();
                if (data.getStatus() == JobStatus.KILLED) {
                    if (this.processing.containsKey(data.getJobId())) {
                        jobDidNotSucceed(data.getJobId(), new TaskException(new InternalSchedulerException("Job id=" + data.getJobId() + " was killed")));
                        return;
                    }
                    return;
                }
                if (debug) {
                    logger.debug(this.name + " receives job finished event...");
                }
                if (data != null && this.processing.containsKey(data.getJobId())) {
                    try {
                        JobResult jobResult = this.scheduler.getJobResult(data.getJobId());
                        if (debug) {
                            logger.debug(getName() + ": updating results of job: " + jobResult.getName());
                        }
                        Collection<TaskIntern<Serializable>> remove = this.processing.remove(data.getJobId());
                        ArrayList arrayList = new ArrayList();
                        Map<String, TaskResult> exceptionResults = jobResult.hadException() ? jobResult.getExceptionResults() : jobResult.getAllResults();
                        for (TaskIntern<Serializable> taskIntern : remove) {
                            if (debug) {
                                logger.debug(getName() + ": looking for result of task: " + taskIntern.getId());
                            }
                            ResultInternImpl resultInternImpl = new ResultInternImpl(taskIntern.getId());
                            TaskResult taskResult = exceptionResults.get("MasterWorker" + taskIntern.getId());
                            if (taskResult == null) {
                                resultInternImpl.setException(new InternalSchedulerException("Task id=" + taskIntern.getId() + " was not returned by the scheduler"));
                                if (debug) {
                                    logger.debug("Task result not found in job result: " + resultInternImpl.getException().getMessage());
                                }
                            } else if (taskResult.hadException()) {
                                resultInternImpl.setException(taskResult.getException());
                                if (debug) {
                                    logger.debug("Task result contains exception: " + resultInternImpl.getException().getMessage());
                                }
                            } else {
                                try {
                                    resultInternImpl.setResult(taskResult.value());
                                } catch (Throwable th) {
                                    resultInternImpl.setException(th);
                                    if (debug) {
                                        logger.debug(resultInternImpl.getException().getMessage());
                                    }
                                }
                            }
                            arrayList.add(resultInternImpl);
                        }
                        this.pendingTasksFutures.offer(this.provider.sendResultsAndGetTasks(arrayList, this.name, true));
                        this.stubOnThis.scheduleTask();
                        return;
                    } catch (SchedulerException e) {
                        jobDidNotSucceed(data.getJobId(), new TaskException(e));
                        return;
                    }
                }
                return;
            default:
                return;
        }
    }

    @Override // org.ow2.proactive.scheduler.common.SchedulerEventListener
    public void jobSubmittedEvent(JobState jobState) {
    }

    @Override // org.ow2.proactive.scheduler.common.SchedulerEventListener
    public void taskStateUpdatedEvent(NotificationData<TaskInfo> notificationData) {
    }

    @Override // org.ow2.proactive.scheduler.common.SchedulerEventListener
    public void usersUpdatedEvent(NotificationData<UserIdentification> notificationData) {
    }

    private void jobDidNotSucceed(JobId jobId, Exception exc) {
        if (debug) {
            logger.debug("Job did not succeed: " + exc.getMessage());
        }
        if (this.processing.containsKey(jobId)) {
            Collection<TaskIntern<Serializable>> remove = this.processing.remove(jobId);
            ArrayList arrayList = new ArrayList();
            Iterator<TaskIntern<Serializable>> it = remove.iterator();
            while (it.hasNext()) {
                ResultInternImpl resultInternImpl = new ResultInternImpl(it.next().getId());
                resultInternImpl.setException(exc);
                arrayList.add(resultInternImpl);
            }
            this.pendingTasksFutures.offer(this.provider.sendResultsAndGetTasks(arrayList, this.name, true));
            this.stubOnThis.scheduleTask();
        }
    }
}
