package org.objectweb.proactive.core.body.ft.protocols.cic.managers;

import java.io.IOException;
import java.rmi.RemoteException;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Vector;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.UniqueID;
import org.objectweb.proactive.core.body.AbstractBody;
import org.objectweb.proactive.core.body.UniversalBody;
import org.objectweb.proactive.core.body.ft.checkpointing.Checkpoint;
import org.objectweb.proactive.core.body.ft.checkpointing.CheckpointInfo;
import org.objectweb.proactive.core.body.ft.exception.ProtocolErrorException;
import org.objectweb.proactive.core.body.ft.internalmsg.FTMessage;
import org.objectweb.proactive.core.body.ft.internalmsg.GlobalStateCompletion;
import org.objectweb.proactive.core.body.ft.internalmsg.OutputCommit;
import org.objectweb.proactive.core.body.ft.message.HistoryUpdater;
import org.objectweb.proactive.core.body.ft.message.ReplyLog;
import org.objectweb.proactive.core.body.ft.message.RequestLog;
import org.objectweb.proactive.core.body.ft.protocols.FTManager;
import org.objectweb.proactive.core.body.ft.protocols.cic.infos.CheckpointInfoCIC;
import org.objectweb.proactive.core.body.ft.protocols.cic.infos.MessageInfoCIC;
import org.objectweb.proactive.core.body.future.MethodCallResult;
import org.objectweb.proactive.core.body.message.Message;
import org.objectweb.proactive.core.body.reply.Reply;
import org.objectweb.proactive.core.body.reply.ReplyImpl;
import org.objectweb.proactive.core.body.request.AwaitedRequest;
import org.objectweb.proactive.core.body.request.BlockingRequestQueue;
import org.objectweb.proactive.core.body.request.BlockingRequestQueueImpl;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.body.request.RequestImpl;
import org.objectweb.proactive.core.mop.Utils;
import org.objectweb.proactive.core.security.exceptions.CommunicationForbiddenException;
import org.objectweb.proactive.core.security.exceptions.RenegotiateSessionException;
import org.objectweb.proactive.core.util.MutableLong;
import org.objectweb.proactive.core.util.log.Loggers;
import org.objectweb.proactive.core.util.log.ProActiveLogger;

/* loaded from: input_file:WEB-INF/lib/proactive-programming-bundle-5.2.0-update-10.jar:org/objectweb/proactive/core/body/ft/protocols/cic/managers/FTManagerCIC.class */
public class FTManagerCIC extends FTManager {
    public static final int RESEND_MESSAGE = -3;
    public static final int RECOVER = -4;
    protected static Logger logger = ProActiveLogger.getLogger(Loggers.FAULT_TOLERANCE_CIC);
    private int incarnation;
    private int lastRecovery;
    private int checkpointIndex;
    private long checkpointTimer;
    private int nextMax;
    private int historyIndex;
    private Hashtable<Integer, Vector<RequestLog>> requestToResend;
    private int latestRequestLog;
    private Hashtable<Integer, Vector<ReplyLog>> replyToResend;
    private int latestReplyLog;
    private Vector<AwaitedRequest> awaitedRequests;
    private MessageInfoCIC forSentRequest;
    private MessageInfoCIC forSentReply;
    private Vector<UniqueID> history;
    private final Character historyLock = 'l';
    private long deliveredRequestsCounter;
    private MutableLong lastServedRequestIndex;
    private Hashtable<UniqueID, MutableLong> localVectorClock;
    private long historyBaseIndex;
    private long lastCommitedIndex;
    private boolean completingCheckpoint;
    private static final boolean isOCEnable = false;

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int init(AbstractBody abstractBody) throws ProActiveException {
        super.init(abstractBody);
        this.incarnation = 1;
        this.checkpointIndex = 0;
        this.historyIndex = 0;
        this.nextMax = 1;
        this.lastRecovery = 0;
        this.checkpointTimer = System.currentTimeMillis();
        this.requestToResend = new Hashtable<>();
        this.latestRequestLog = 0;
        this.replyToResend = new Hashtable<>();
        this.latestReplyLog = 0;
        this.history = new Vector<>();
        this.awaitedRequests = new Vector<>();
        this.forSentRequest = new MessageInfoCIC();
        this.forSentReply = new MessageInfoCIC();
        this.deliveredRequestsCounter = -1L;
        this.lastCommitedIndex = -1L;
        this.lastServedRequestIndex = new MutableLong(0L);
        this.historyBaseIndex = 0L;
        this.completingCheckpoint = false;
        this.localVectorClock = new Hashtable<>();
        this.localVectorClock.put(this.ownerID, this.lastServedRequestIndex);
        logger.info(" CIC fault-tolerance is enabled for body " + this.ownerID);
        return 0;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int onReceiveReply(Reply reply) {
        reply.setFTManager(this);
        return incarnationTest(reply);
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int onReceiveRequest(Request request) {
        request.setFTManager(this);
        return incarnationTest(request);
    }

    private int incarnationTest(Message message) {
        if (!isSignificant(message)) {
            return 0;
        }
        MessageInfoCIC messageInfoCIC = (MessageInfoCIC) message.getMessageInfo();
        int i = this.incarnation;
        char c = messageInfoCIC.incarnation;
        if (c > i) {
            message.setIgnoreIt(true);
            return -3;
        }
        if (c >= i) {
            return 0;
        }
        message.setIgnoreIt(true);
        return -4;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onDeliverReply(Reply reply) {
        int i = this.checkpointIndex;
        if (isSignificant(reply)) {
            MessageInfoCIC messageInfoCIC = (MessageInfoCIC) reply.getMessageInfo();
            updateHistory(messageInfoCIC.historyIndex);
            if (messageInfoCIC.checkpointIndex > i) {
                this.nextMax = Math.max(this.nextMax, (int) messageInfoCIC.checkpointIndex);
            }
        }
        return i;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onDeliverRequest(Request request) {
        int i = this.checkpointIndex;
        if (isSignificant(request)) {
            MessageInfoCIC messageInfoCIC = (MessageInfoCIC) request.getMessageInfo();
            updateHistory(messageInfoCIC.historyIndex);
            if (updateAwaitedRequests(request)) {
                request.setIgnoreIt(true);
            } else if (this.completingCheckpoint) {
                synchronized (this.historyLock) {
                    this.history.add(request.getSourceBodyID());
                }
                this.deliveredRequestsCounter++;
            }
            char c = messageInfoCIC.checkpointIndex;
            if (c > i) {
                this.nextMax = Math.max(this.nextMax, (int) c);
                messageInfoCIC.isOrphanFor = c;
            }
        }
        return i;
    }

    private void updateHistory(int i) {
        if (i > this.historyIndex) {
            commitHistories(this.checkpointIndex, this.deliveredRequestsCounter, true, true);
            if (this.completingCheckpoint) {
                this.completingCheckpoint = false;
            }
        }
    }

    private boolean isSignificant(Message message) {
        return (message.getMessageInfo() == null || message.getMessageInfo().isFromHalfBody()) ? false : true;
    }

    private void updateLocalVectorClock(Hashtable<UniqueID, MutableLong> hashtable) {
        Enumeration<UniqueID> keys = hashtable.keys();
        while (keys.hasMoreElements()) {
            UniqueID nextElement = keys.nextElement();
            MutableLong mutableLong = this.localVectorClock.get(nextElement);
            MutableLong mutableLong2 = hashtable.get(nextElement);
            if (mutableLong == null) {
                this.localVectorClock.put(nextElement, new MutableLong(mutableLong2.getValue()));
            } else if (mutableLong.isLessThan(mutableLong2)) {
                mutableLong.setValue(mutableLong2.getValue());
            }
        }
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onSendReplyBefore(Reply reply) {
        this.forSentReply.checkpointIndex = (char) this.checkpointIndex;
        this.forSentReply.historyIndex = (char) this.historyIndex;
        this.forSentReply.incarnation = (char) this.incarnation;
        this.forSentReply.lastRecovery = (char) this.lastRecovery;
        this.forSentReply.isOrphanFor = (char) 65535;
        this.forSentReply.fromHalfBody = false;
        this.forSentReply.vectorClock = null;
        reply.setMessageInfo(this.forSentReply);
        return 0;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onSendRequestBefore(Request request) {
        this.forSentRequest.checkpointIndex = (char) this.checkpointIndex;
        this.forSentRequest.historyIndex = (char) this.historyIndex;
        this.forSentRequest.incarnation = (char) this.incarnation;
        this.forSentRequest.lastRecovery = (char) this.lastRecovery;
        this.forSentRequest.isOrphanFor = (char) 65535;
        this.forSentRequest.fromHalfBody = false;
        request.setMessageInfo(this.forSentRequest);
        return 0;
    }

    private boolean isOutputCommit(Message message) {
        return false;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onSendReplyAfter(Reply reply, int i, UniversalBody universalBody) {
        if (i == -3) {
            try {
                reply.setIgnoreIt(false);
                Thread.sleep(FTManager.TIME_TO_RESEND);
                return onSendReplyAfter(reply, sendReply(reply, universalBody), universalBody);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int i2 = this.checkpointIndex;
        if (i <= i2) {
            return 0;
        }
        this.nextMax = Math.max(this.nextMax, i);
        extendReplyLog(i);
        try {
            ReplyLog replyLog = new ReplyLog(new ReplyImpl(reply.getSourceBodyID(), reply.getSequenceNumber(), reply.getMethodName(), (MethodCallResult) Utils.makeDeepCopy(reply.getResult()), null), universalBody.getRemoteAdapter());
            for (int i3 = i2 + 1; i3 <= i; i3++) {
                this.replyToResend.get(new Integer(i3)).add(replyLog);
            }
            return 0;
        } catch (IOException e2) {
            e2.printStackTrace();
            return 0;
        }
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public synchronized int onSendRequestAfter(Request request, int i, UniversalBody universalBody) throws RenegotiateSessionException, CommunicationForbiddenException {
        if (i == -3) {
            try {
                request.resetSendCounter();
                request.setIgnoreIt(false);
                Thread.sleep(FTManager.TIME_TO_RESEND);
                return onSendRequestAfter(request, sendRequest(request, universalBody), universalBody);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (RenegotiateSessionException e2) {
                throw e2;
            }
        }
        int i2 = this.checkpointIndex;
        if (i <= i2) {
            return 0;
        }
        this.nextMax = Math.max(this.nextMax, i);
        extendRequestLog(i);
        try {
            request.getMethodCall().makeDeepCopyOfArguments();
            request.resetSendCounter();
            RequestLog requestLog = new RequestLog(request, universalBody.getRemoteAdapter());
            for (int i3 = i2 + 1; i3 <= i; i3++) {
                this.requestToResend.get(new Integer(i3)).add(requestLog);
            }
            return 0;
        } catch (IOException e3) {
            e3.printStackTrace();
            return 0;
        }
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int onServeRequestBefore(Request request) {
        while (haveToCheckpoint()) {
            checkpoint(request);
        }
        return 0;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int onServeRequestAfter(Request request) {
        return 0;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public int beforeRestartAfterRecovery(CheckpointInfo checkpointInfo, int i) {
        CheckpointInfoCIC checkpointInfoCIC = (CheckpointInfoCIC) checkpointInfo;
        BlockingRequestQueue requestQueue = this.owner.getRequestQueue();
        int i2 = checkpointInfoCIC.checkpointIndex;
        this.history = new Vector<>();
        this.completingCheckpoint = false;
        this.lastCommitedIndex = checkpointInfoCIC.lastCommitedIndex;
        this.deliveredRequestsCounter = checkpointInfoCIC.lastCommitedIndex;
        this.historyBaseIndex = checkpointInfoCIC.lastCommitedIndex + 1;
        this.awaitedRequests = new Vector<>();
        this.replyToResend = new Hashtable<>();
        this.requestToResend = new Hashtable<>();
        this.checkpointIndex = i2;
        this.nextMax = i2;
        this.checkpointTimer = System.currentTimeMillis();
        this.historyIndex = i2;
        this.lastRecovery = i2;
        this.incarnation = i;
        Request request = checkpointInfoCIC.pendingRequest;
        if (request != null) {
            requestQueue.addToFront(request);
        }
        filterQueue(requestQueue, checkpointInfoCIC);
        Iterator<UniqueID> it = checkpointInfoCIC.history.iterator();
        while (it.hasNext()) {
            AwaitedRequest awaitedRequest = new AwaitedRequest(it.next());
            requestQueue.add(awaitedRequest);
            this.awaitedRequests.add(awaitedRequest);
        }
        this.owner.acceptCommunication();
        try {
            this.location.updateLocation(this.ownerID, this.owner.getRemoteAdapter());
            this.recovery.updateState(this.ownerID, 2);
        } catch (RemoteException e) {
            logger.error("Unable to connect with location server");
            e.printStackTrace();
        }
        sendLogs((CheckpointInfoCIC) checkpointInfo);
        return 0;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public void updateLocationAtServer(UniqueID uniqueID, UniversalBody universalBody) {
        try {
            this.location.updateLocation(uniqueID, universalBody);
        } catch (RemoteException e) {
            logger.error("Unable to connect with location server");
            e.printStackTrace();
        }
    }

    private boolean updateAwaitedRequests(Request request) {
        AwaitedRequest awaitedRequest = null;
        Iterator<AwaitedRequest> it = this.awaitedRequests.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            AwaitedRequest next = it.next();
            if (next.getAwaitedSender().equals(request.getSourceBodyID())) {
                awaitedRequest = next;
                break;
            }
        }
        if (awaitedRequest == null) {
            return false;
        }
        awaitedRequest.setAwaitedRequest(request);
        this.awaitedRequests.remove(awaitedRequest);
        return true;
    }

    private boolean haveToCheckpoint() {
        return this.nextMax > this.checkpointIndex || this.checkpointTimer + ((long) this.ttc) < System.currentTimeMillis();
    }

    private Checkpoint checkpoint(Request request) {
        Checkpoint checkpoint;
        this.owner.blockCommunication();
        synchronized (this.historyLock) {
            try {
                try {
                    synchronized (this) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("[CIC] Checkpointing with index = " + (this.checkpointIndex + 1));
                        }
                        CheckpointInfoCIC checkpointInfoCIC = new CheckpointInfoCIC();
                        extendReplyLog(this.checkpointIndex + 1);
                        extendRequestLog(this.checkpointIndex + 1);
                        checkpointInfoCIC.replyToResend = this.replyToResend.get(Integer.valueOf(this.checkpointIndex + 1));
                        checkpointInfoCIC.requestToResend = this.requestToResend.get(Integer.valueOf(this.checkpointIndex + 1));
                        checkpointInfoCIC.pendingRequest = request;
                        checkpointInfoCIC.checkpointIndex = this.checkpointIndex + 1;
                        this.replyToResend.remove(Integer.valueOf(this.checkpointIndex + 1));
                        this.requestToResend.remove(Integer.valueOf(this.checkpointIndex + 1));
                        this.checkpointIndex++;
                        this.history = new Vector<>();
                        this.historyBaseIndex = this.deliveredRequestsCounter + 1;
                        this.lastCommitedIndex = this.deliveredRequestsCounter;
                        Hashtable<Integer, Vector<RequestLog>> hashtable = this.requestToResend;
                        this.requestToResend = null;
                        Hashtable<Integer, Vector<ReplyLog>> hashtable2 = this.replyToResend;
                        this.replyToResend = null;
                        Vector<UniqueID> vector = this.history;
                        this.history = null;
                        Vector<AwaitedRequest> vector2 = this.awaitedRequests;
                        this.awaitedRequests = null;
                        checkpointInfoCIC.lastRcvdRequestIndex = this.deliveredRequestsCounter;
                        setCheckpointTag(true);
                        checkpoint = new Checkpoint(this.owner, this.additionalCodebase);
                        checkpoint.setCheckpointInfo(checkpointInfoCIC);
                        this.storage.storeCheckpoint(checkpoint, this.incarnation);
                        setCheckpointTag(false);
                        this.replyToResend = hashtable2;
                        this.requestToResend = hashtable;
                        this.history = vector;
                        this.awaitedRequests = vector2;
                        this.completingCheckpoint = true;
                        this.checkpointTimer = System.currentTimeMillis();
                    }
                } catch (RemoteException e) {
                    logger.error("[CIC] Unable to send checkpoint to the server");
                    e.printStackTrace();
                    this.owner.acceptCommunication();
                    return null;
                }
            } finally {
                this.owner.acceptCommunication();
            }
        }
        return checkpoint;
    }

    private HistoryUpdater commitHistories(int i, long j, boolean z, boolean z2) {
        HistoryUpdater historyUpdater;
        synchronized (this.historyLock) {
            if (z2) {
                if (this.historyIndex >= i) {
                    return null;
                }
            }
            List<UniqueID> historyToCommit = getHistoryToCommit(this.lastCommitedIndex + 1, j);
            if (historyToCommit == null) {
                historyUpdater = new HistoryUpdater(historyToCommit, 0L, 0L, this.ownerID, i, this.incarnation);
                this.historyIndex = this.checkpointIndex;
            } else {
                historyUpdater = new HistoryUpdater(historyToCommit, this.lastCommitedIndex + 1, j, this.ownerID, i, this.incarnation);
                this.historyIndex = this.checkpointIndex;
                this.lastCommitedIndex = j;
                deleteCommitedHistory(historyUpdater.base, historyUpdater.last);
            }
            if (z) {
                try {
                    this.storage.commitHistory(historyUpdater);
                } catch (RemoteException e) {
                    logger.error("[ERROR] Storage server is not reachable !");
                    e.printStackTrace();
                }
            }
            return historyUpdater;
        }
    }

    private List<UniqueID> getHistoryToCommit(long j, long j2) {
        if (j == j2 + 1) {
            return null;
        }
        int i = (int) (j - this.historyBaseIndex);
        int i2 = (int) (j2 - this.historyBaseIndex);
        Vector vector = new Vector(i2 - i);
        Iterator<UniqueID> it = this.history.iterator();
        for (int i3 = 0; i3 < i; i3++) {
            it.next();
        }
        for (int i4 = i; i4 <= i2; i4++) {
            vector.add(it.next());
        }
        return vector;
    }

    private void deleteCommitedHistory(long j, long j2) {
        if (j2 < this.historyBaseIndex || j < this.historyBaseIndex) {
            throw new ProtocolErrorException("Deleting from " + j + " up to " + j2 + " while local is from " + this.historyBaseIndex + " up to " + this.deliveredRequestsCounter);
        }
        if (this.history.isEmpty()) {
            return;
        }
        this.history.subList((int) (j - this.historyBaseIndex), (int) ((j2 - this.historyBaseIndex) + 1)).clear();
        this.historyBaseIndex = j2 + 1;
    }

    private void sendLogs(CheckpointInfoCIC checkpointInfoCIC) {
        Iterator<ReplyLog> it = checkpointInfoCIC.replyToResend.iterator();
        while (it.hasNext()) {
            ReplyLog next = it.next();
            sendReply(next.getReply(), next.getDestination());
        }
        Iterator<RequestLog> it2 = checkpointInfoCIC.requestToResend.iterator();
        while (it2.hasNext()) {
            try {
                RequestLog next2 = it2.next();
                Request request = next2.getRequest();
                sendRequest(new RequestImpl(request.getMethodCall(), this.owner.getRemoteAdapter(), request.isOneWay(), request.getSequenceNumber()), next2.getDestination());
            } catch (CommunicationForbiddenException e) {
                e.printStackTrace();
            } catch (RenegotiateSessionException e2) {
                e2.printStackTrace();
            }
        }
    }

    private void filterQueue(BlockingRequestQueue blockingRequestQueue, CheckpointInfoCIC checkpointInfoCIC) {
        ListIterator<Request> listIterator = ((BlockingRequestQueueImpl) blockingRequestQueue).getInternalQueue().listIterator();
        while (listIterator.hasNext()) {
            Request next = listIterator.next();
            MessageInfoCIC messageInfoCIC = (MessageInfoCIC) next.getMessageInfo();
            if (messageInfoCIC == null) {
                if (next instanceof AwaitedRequest) {
                    this.awaitedRequests.add((AwaitedRequest) next);
                }
            } else if (messageInfoCIC.isOrphanFor <= checkpointInfoCIC.checkpointIndex) {
                AwaitedRequest awaitedRequest = new AwaitedRequest(next.getSourceBodyID());
                listIterator.set(awaitedRequest);
                this.awaitedRequests.add(awaitedRequest);
            }
        }
    }

    private void extendRequestLog(int i) {
        if (this.latestRequestLog < i) {
            for (int i2 = this.latestRequestLog + 1; i2 <= i; i2++) {
                this.requestToResend.put(Integer.valueOf(i2), new Vector<>());
            }
            this.latestRequestLog = i;
        }
    }

    private void extendReplyLog(int i) {
        if (this.latestReplyLog < i) {
            for (int i2 = this.latestReplyLog + 1; i2 <= i; i2++) {
                this.replyToResend.put(Integer.valueOf(i2), new Vector<>());
            }
            this.latestReplyLog = i;
        }
    }

    public String toString() {
        return " Incarnation = " + this.incarnation;
    }

    @Override // org.objectweb.proactive.core.body.ft.protocols.FTManager
    public Object handleFTMessage(FTMessage fTMessage) {
        return fTMessage.handleFTMessage(this);
    }

    public HistoryUpdater handlingGSCEEvent(GlobalStateCompletion globalStateCompletion) {
        HistoryUpdater commitHistories = commitHistories(this.checkpointIndex, this.deliveredRequestsCounter, false, true);
        if (this.completingCheckpoint) {
            this.completingCheckpoint = false;
        }
        return commitHistories;
    }

    public HistoryUpdater handlingOCEvent(OutputCommit outputCommit) {
        return commitHistories(this.completingCheckpoint ? this.checkpointIndex - 1 : this.checkpointIndex, outputCommit.getLastIndexToRetreive(), false, false);
    }
}
