package fr.inria.eventcloud.proxies;

import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.sparql.engine.binding.Binding;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.PublishSubscribeConstants;
import fr.inria.eventcloud.api.Quadruple;
import fr.inria.eventcloud.api.QuadruplePattern;
import fr.inria.eventcloud.api.SubscriptionId;
import fr.inria.eventcloud.api.listeners.BindingNotificationListener;
import fr.inria.eventcloud.api.listeners.BindingWrapperNotificationListener;
import fr.inria.eventcloud.api.listeners.CompoundEventNotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListener;
import fr.inria.eventcloud.api.listeners.NotificationListenerType;
import fr.inria.eventcloud.api.listeners.SignalNotificationListener;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.api.wrappers.BindingWrapper;
import fr.inria.eventcloud.configuration.EventCloudProperties;
import fr.inria.eventcloud.datastore.Vars;
import fr.inria.eventcloud.messages.request.can.IndexSubscriptionRequest;
import fr.inria.eventcloud.messages.request.can.ReconstructCompoundEventRequest;
import fr.inria.eventcloud.messages.request.can.UnsubscribeRequest;
import fr.inria.eventcloud.messages.response.can.QuadruplePatternResponse;
import fr.inria.eventcloud.pubsub.Notification;
import fr.inria.eventcloud.pubsub.NotificationId;
import fr.inria.eventcloud.pubsub.PublishSubscribeUtils;
import fr.inria.eventcloud.pubsub.Solution;
import fr.inria.eventcloud.pubsub.Subscription;
import fr.inria.eventcloud.pubsub.Subsubscription;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.cxf.phase.Phase;
import org.apache.jdbm.DB;
import org.apache.jdbm.DBMaker;
import org.objectweb.proactive.Body;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.api.PAFuture;
import org.objectweb.proactive.core.component.body.ComponentEndActive;
import org.objectweb.proactive.extensions.p2p.structured.configuration.P2PStructuredProperties;
import org.objectweb.proactive.extensions.p2p.structured.proxies.Proxies;
import org.objectweb.proactive.extensions.p2p.structured.utils.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fr/inria/eventcloud/proxies/SubscribeProxyImpl.class */
public class SubscribeProxyImpl extends Proxy implements ComponentEndActive, SubscribeProxy, SubscribeProxyAttributeController {
    private static final long serialVersionUID = 1;
    private static final String DB_NAME = "eventIdsReceived";
    public static final String SUBSCRIBE_PROXY_ADL = "fr.inria.eventcloud.proxies.SubscribeProxy";
    public static final String SUBSCRIBE_SERVICES_ITF = "subscribe-services";
    private static final Logger log = LoggerFactory.getLogger(SubscribeProxy.class);
    private ConcurrentMap<SubscriptionId, Subscription> subscriptions;
    private ConcurrentMap<SubscriptionId, NotificationListener<?>> listeners;
    private ConcurrentMap<NotificationId, Solution> solutions;
    private DB eventIdsReceivedDB;
    private String componentUri;

    @Override // fr.inria.eventcloud.proxies.Proxy, org.objectweb.proactive.extensions.p2p.structured.AbstractComponent, org.objectweb.proactive.core.component.body.ComponentInitActive
    public void initComponentActivity(Body body) {
        super.initComponentActivity(body);
        body.setImmediateService("setImmediateServices", false);
        body.setImmediateService("setAttributes", false);
        body.setImmediateService(Phase.RECEIVE, false);
        createAndRegisterEventIdsReceivedDB(body);
    }

    @Override // org.objectweb.proactive.core.component.body.ComponentEndActive
    public void endComponentActivity(Body body) {
        unsubscribeAndCloseDB();
    }

    private void createAndRegisterEventIdsReceivedDB(Body body) {
        String str = EventCloudProperties.getDefaultTemporaryPath() + "jdbm" + File.separatorChar;
        new File(str).mkdirs();
        this.eventIdsReceivedDB = DBMaker.openFile(str + body.getID()).deleteFilesAfterClose().disableLocking().disableTransactions().enableSoftCache().make();
        this.eventIdsReceivedDB.createHashMap(DB_NAME);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxyAttributeController
    public void setAttributes(EventCloudCache eventCloudCache, String str, AlterableElaProperty[] alterableElaPropertyArr) {
        if (this.eventCloudCache == null) {
            this.eventCloudCache = eventCloudCache;
            this.proxy = Proxies.newProxy(this.eventCloudCache.getTrackers());
            this.componentUri = str;
            this.subscriptions = new ConcurrentHashMap();
            this.listeners = new ConcurrentHashMap();
            this.solutions = new ConcurrentHashMap();
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: fr.inria.eventcloud.proxies.SubscribeProxyImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    SubscribeProxyImpl.this.unsubscribeAndCloseDB();
                }
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unsubscribeAndCloseDB() {
        if (this.eventIdsReceivedDB.isClosed()) {
            return;
        }
        Iterator<SubscriptionId> it = this.subscriptions.keySet().iterator();
        while (it.hasNext()) {
            unsubscribe(it.next());
            it.remove();
        }
        this.eventIdsReceivedDB.close();
    }

    @Override // fr.inria.eventcloud.api.SubscribeApi
    public <T> void subscribe(fr.inria.eventcloud.api.Subscription subscription, NotificationListener<T> notificationListener) {
        String sparqlQuery = subscription.getSparqlQuery();
        if (notificationListener instanceof CompoundEventNotificationListener) {
            sparqlQuery = PublishSubscribeUtils.removeResultVarsExceptGraphVar(sparqlQuery);
        }
        Subscription subscription2 = new Subscription(subscription.getId(), null, subscription.getId(), subscription.getCreationTime(), sparqlQuery, this.componentUri, subscription.getSubscriptionDestination(), notificationListener.getType());
        if (this.listeners.put(subscription.getId(), notificationListener) != null) {
            throw new IllegalArgumentException("Listener already exists for subscription id: " + subscription.getId());
        }
        if (this.subscriptions.put(subscription2.getId(), subscription2) != null) {
            throw new IllegalArgumentException("Subscription already exists for subscription id: " + subscription.getId());
        }
        super.sendv(new IndexSubscriptionRequest(subscription2));
        log.info("New subscription has been registered from {} with id {}", PAActiveObject.getBodyOnThis().getUrl(), subscription.getId());
    }

    private Node extractEventId(Subscription subscription, Binding binding) {
        if (!subscription.getGraphNode().isVariable()) {
            throw new IllegalArgumentException("The subscription graph node is not a variable");
        }
        Node node = binding.get(Vars.GRAPH);
        if (node == null) {
            throw new IllegalArgumentException("The specified binding does not contain a graph value");
        }
        return node;
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    public final CompoundEvent reconstructCompoundEvent(Subscription subscription, Binding binding) {
        return reconstructCompoundEvent(subscription.getId(), extractEventId(subscription, binding));
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    public final CompoundEvent reconstructCompoundEvent(SubscriptionId subscriptionId, Node node) {
        if (!node.isURI()) {
            throw new IllegalArgumentException("The event id must be an URI:" + node);
        }
        if (getEventIdsReceived().containsKey(node.getURI())) {
            return null;
        }
        int i = -1;
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        QuadruplePattern quadruplePattern = new QuadruplePattern(node, Node.ANY, Node.ANY, Node.ANY);
        while (arrayList.size() != i) {
            if (getEventIdsReceived().containsKey(node.getURI())) {
                return null;
            }
            log.info("Reconstructing compound event for subscription {} and graph value {} ({}/{})", new Object[]{subscriptionId, node, Integer.valueOf(arrayList.size()), Integer.valueOf(i)});
            for (Quadruple quadruple : ((QuadruplePatternResponse) PAFuture.getFutureValue(super.selectPeer().send(new ReconstructCompoundEventRequest(quadruplePattern, hashSet)))).getResult()) {
                if (quadruple.getPredicate().equals(PublishSubscribeConstants.EVENT_NB_QUADRUPLES_NODE)) {
                    i = ((Integer) quadruple.getObject().getLiteralValue()).intValue();
                }
                arrayList.add(quadruple);
                hashSet.add(quadruple.hashValue());
            }
            try {
                Thread.sleep(EventCloudProperties.RECONSTRUCTION_RETRY_THRESHOLD.getValue().intValue());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        if (getEventIdsReceived().putIfAbsent(node.getURI(), subscriptionId) != null) {
            return null;
        }
        return new CompoundEvent((List<Quadruple>) arrayList, false);
    }

    @Override // fr.inria.eventcloud.api.SubscribeApi
    public void unsubscribe(SubscriptionId subscriptionId) {
        if (!this.subscriptions.containsKey(subscriptionId)) {
            throw new IllegalArgumentException("No subscription registered with the specified subscription id: " + subscriptionId);
        }
        Subscription remove = this.subscriptions.remove(subscriptionId);
        this.listeners.remove(subscriptionId);
        getEventIdsReceived().values().remove(subscriptionId);
        for (Subsubscription subsubscription : remove.getSubSubscriptions()) {
            super.selectPeer().send(new UnsubscribeRequest(remove.getOriginalId(), subsubscription.getAtomicQuery(), remove.getType() == NotificationListenerType.BINDING));
        }
    }

    private void deliver(NotificationId notificationId) {
        NotificationListener<?> notificationListener = this.listeners.get(notificationId.getSubscriptionId());
        Solution remove = this.solutions.remove(notificationId);
        if (notificationListener instanceof BindingNotificationListener) {
            deliver(notificationId, (BindingNotificationListener) notificationListener, remove);
        } else if (notificationListener instanceof BindingWrapperNotificationListener) {
            deliver(notificationId, (BindingWrapperNotificationListener) notificationListener, remove);
        } else if (notificationListener instanceof CompoundEventNotificationListener) {
            deliver(notificationId, (CompoundEventNotificationListener) notificationListener, remove);
        } else if (notificationListener instanceof SignalNotificationListener) {
            deliver(notificationId, (SignalNotificationListener) notificationListener, remove);
        } else {
            log.error("Unknown notification listener for delivery: {}", notificationListener.getClass());
        }
        log.info("Notification {} has been delivered", notificationId);
        log.info("EventCloud Exit " + Quadruple.removeMetaInformation(extractEventId(this.subscriptions.get(notificationId.getSubscriptionId()), remove.getSolution())));
    }

    private final void deliver(NotificationId notificationId, BindingNotificationListener bindingNotificationListener, Solution solution) {
        bindingNotificationListener.onNotification(notificationId.getSubscriptionId(), solution.getSolution());
        sendInputOutputMonitoringReportIfNecessary(notificationId.getSubscriptionId(), solution, bindingNotificationListener.getSubscriberUrl());
    }

    private final void deliver(NotificationId notificationId, BindingWrapperNotificationListener bindingWrapperNotificationListener, Solution solution) {
        bindingWrapperNotificationListener.onNotification(notificationId.getSubscriptionId(), new BindingWrapper(solution.getSolution()));
        sendInputOutputMonitoringReportIfNecessary(notificationId.getSubscriptionId(), solution, bindingWrapperNotificationListener.getSubscriberUrl());
    }

    private final void deliver(NotificationId notificationId, CompoundEventNotificationListener compoundEventNotificationListener, Solution solution) {
        CompoundEvent reconstructCompoundEvent = reconstructCompoundEvent(this.subscriptions.get(notificationId.getSubscriptionId()), solution.getSolution());
        if (P2PStructuredProperties.ENABLE_BENCHMARKS_INFORMATION.getValue().booleanValue()) {
            for (int i = 0; i < reconstructCompoundEvent.size(); i++) {
                log.info("Reconstructed compound event containing quadruple : " + reconstructCompoundEvent.getQuadruples().get(i));
            }
        }
        if (reconstructCompoundEvent == null || !this.subscriptions.containsKey(notificationId.getSubscriptionId())) {
            return;
        }
        compoundEventNotificationListener.onNotification(notificationId.getSubscriptionId(), reconstructCompoundEvent);
        sendInputOutputMonitoringReport(notificationId.getSubscriptionId(), solution.getSolution(), compoundEventNotificationListener.getSubscriberUrl());
    }

    private final void deliver(NotificationId notificationId, SignalNotificationListener signalNotificationListener, Solution solution) {
        signalNotificationListener.onNotification(notificationId.getSubscriptionId());
        sendInputOutputMonitoringReportIfNecessary(notificationId.getSubscriptionId(), solution, signalNotificationListener.getSubscriberUrl());
    }

    private void sendInputOutputMonitoringReportIfNecessary(SubscriptionId subscriptionId, Solution solution, String str) {
        Node extractEventId = extractEventId(this.subscriptions.get(subscriptionId), solution.getSolution());
        if (getEventIdsReceived().put(extractEventId.getURI(), subscriptionId) == null) {
            sendInputOutputMonitoringReport(extractEventId, str);
        }
    }

    private void sendInputOutputMonitoringReport(SubscriptionId subscriptionId, Binding binding, String str) {
        if (this.monitoringManager != null) {
            sendInputOutputMonitoringReport(extractEventId(this.subscriptions.get(subscriptionId), binding), str);
        }
    }

    private void sendInputOutputMonitoringReport(Node node, String str) {
        if (this.monitoringManager != null) {
            String str2 = this.componentUri;
            if (str != null) {
                str2 = str;
            }
            String publicationSource = Quadruple.getPublicationSource(node);
            if (publicationSource == null) {
                publicationSource = "http://0.0.0.0";
            }
            this.monitoringManager.sendInputOutputMonitoringReport(publicationSource, str2, Quadruple.getPublicationTime(node));
        }
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    public void receive(Notification notification) {
        SubscriptionId subscriptionId = notification.getId().getSubscriptionId();
        Subscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            return;
        }
        log.debug("New notification received {} on {} for subscription id {}", new Object[]{notification, this.componentUri, subscriptionId});
        Solution solution = this.solutions.get(notification.getId());
        if (solution == null) {
            solution = new Solution(subscription.getSubSubscriptions().length, notification.getBinding());
            Solution putIfAbsent = this.solutions.putIfAbsent(notification.getId(), solution);
            if (putIfAbsent != null) {
                solution = putIfAbsent;
                solution.addSubSolution(notification.getBinding());
            }
        } else {
            solution.addSubSolution(notification.getBinding());
        }
        if (solution.isReady() || subscription.getType() == NotificationListenerType.COMPOUND_EVENT || subscription.getType() == NotificationListenerType.SIGNAL) {
            deliver(notification.getId());
        }
    }

    private ConcurrentMap<Object, Object> getEventIdsReceived() {
        return this.eventIdsReceivedDB.getHashMap(DB_NAME);
    }

    @Override // fr.inria.eventcloud.proxies.SubscribeProxy
    public Subscription find(SubscriptionId subscriptionId) {
        return this.subscriptions.get(subscriptionId);
    }

    public String getComponentUri() {
        return this.componentUri;
    }

    @Deprecated
    public static SubscribeProxy lookup(String str) throws IOException {
        return (SubscribeProxy) ComponentUtils.lookupFcInterface(str, SUBSCRIBE_SERVICES_ITF, SubscribeProxy.class);
    }
}
