package eu.play_project.dcep.distributedetalis;

import eu.play_project.dcep.distributedetalis.api.SimplePublishApi;
import eu.play_project.play_commons.constants.Stream;
import eu.play_project.play_platformservices.api.EpSparqlQuery;
import fr.inria.eventcloud.api.CompoundEvent;
import fr.inria.eventcloud.api.EventCloudId;
import fr.inria.eventcloud.api.PublishApi;
import fr.inria.eventcloud.api.PutGetApi;
import fr.inria.eventcloud.api.SubscribeApi;
import fr.inria.eventcloud.api.Subscription;
import fr.inria.eventcloud.api.properties.AlterableElaProperty;
import fr.inria.eventcloud.api.responses.SparqlSelectResponse;
import fr.inria.eventcloud.exceptions.EventCloudIdNotManaged;
import fr.inria.eventcloud.factories.ProxyFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.objectweb.proactive.ActiveObjectCreationException;
import org.objectweb.proactive.api.PAActiveObject;
import org.objectweb.proactive.core.node.NodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/play_project/dcep/distributedetalis/EcConnectionManager.class */
public class EcConnectionManager implements SimplePublishApi, Serializable {
    private static final long serialVersionUID = -368781636399635332L;
    private String eventCloudRegistryUrl;
    private boolean init;
    private EcConnectionListener eventCloudListener;
    private Map<SubscribeApi, SubscriptionUsage> subscriptions = new HashMap();
    private Logger logger = LoggerFactory.getLogger(EcConnectionManager.class);
    private Map<String, PutGetApi> putGetClouds = new HashMap();
    private Map<String, PublishApi> outputClouds = new HashMap();
    private Map<String, SubscribeApi> inputClouds = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/play_project/dcep/distributedetalis/EcConnectionManager$SubscriptionUsage.class */
    public class SubscriptionUsage implements Serializable {
        private static final long serialVersionUID = -6063251924935507681L;
        public Subscription sub;
        public int usage = 1;

        public SubscriptionUsage(Subscription subscription) {
            this.sub = subscription;
        }
    }

    public EcConnectionManager(String str, DistributedEtalis distributedEtalis) {
        this.init = false;
        this.eventCloudRegistryUrl = str;
        try {
            this.eventCloudListener = (EcConnectionListener) PAActiveObject.newActive(EcConnectionListener.class, new Object[0]);
            this.eventCloudListener.setDetalis(distributedEtalis);
            this.init = true;
        } catch (ActiveObjectCreationException e) {
            this.logger.error("Error while initializing event cloud listener.", (Throwable) e);
        } catch (NodeException e2) {
            this.logger.error("Error while initializing event cloud listener.", (Throwable) e2);
        }
    }

    public synchronized SparqlSelectResponse getDataFromCloud(String str, String str2) throws EventCloudIdNotManaged {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        this.logger.info("Get data from EventCloud '" + str2 + "' with query : " + str);
        try {
            return getHistoricCloud(str2).executeSparqlSelect(str);
        } catch (EventCloudIdNotManaged e) {
            this.logger.error("Error while connecting to event cloud {}.", str2);
            throw e;
        }
    }

    public PutGetApi getHistoricCloud(String str) throws EventCloudIdNotManaged {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        if (!this.putGetClouds.containsKey(str)) {
            this.putGetClouds.put(str, ProxyFactory.newPutGetProxy(this.eventCloudRegistryUrl, new EventCloudId(Stream.toTopicUri(str))));
        }
        return this.putGetClouds.get(str);
    }

    public SubscribeApi getInputCloud(String str) throws EventCloudIdNotManaged {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        if (!this.inputClouds.containsKey(str)) {
            this.inputClouds.put(str, ProxyFactory.newSubscribeProxy(this.eventCloudRegistryUrl, new EventCloudId(str), new AlterableElaProperty[0]));
        }
        return this.inputClouds.get(str);
    }

    public PublishApi getOutputCloud(String str) throws EventCloudIdNotManaged {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        if (!this.outputClouds.containsKey(str)) {
            this.outputClouds.put(str, ProxyFactory.newPublishProxy(this.eventCloudRegistryUrl, new EventCloudId(str)));
        }
        return this.outputClouds.get(str);
    }

    @Override // eu.play_project.dcep.distributedetalis.api.SimplePublishApi
    public void publish(CompoundEvent compoundEvent) {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        String cloudId = EventCloudHelpers.getCloudId(compoundEvent);
        try {
            getOutputCloud(cloudId).publish(compoundEvent);
        } catch (EventCloudIdNotManaged e) {
            this.logger.error("Event could not be published to cloud {}.", cloudId);
        }
    }

    public void registerEventPattern(EpSparqlQuery epSparqlQuery) {
        Iterator<String> it = epSparqlQuery.getQueryDetails().getInputStreams().iterator();
        while (it.hasNext()) {
            subscribe(it.next());
        }
    }

    public void unregisterEventPattern(EpSparqlQuery epSparqlQuery) {
        for (String str : epSparqlQuery.getQueryDetails().getInputStreams()) {
            try {
                unsubscribe(str, this.subscriptions.get(getInputCloud(str)).sub);
            } catch (EventCloudIdNotManaged e) {
                this.logger.error("Incurred unknown event cloud {}.", str);
            }
        }
    }

    public Subscription subscribe(String str) {
        if (!this.init) {
            throw new IllegalStateException(String.valueOf(getClass().getSimpleName()) + " has not been initialized.");
        }
        Subscription any = Subscription.any();
        try {
            if (this.subscriptions.containsKey(getInputCloud(str))) {
                this.logger.info("Still subscribed to eventcloud {}.", str);
                this.subscriptions.get(getInputCloud(str)).usage++;
            } else {
                this.logger.info("Subscribing to eventcloud {}.", str);
                getInputCloud(str).subscribe(any, this.eventCloudListener);
                this.subscriptions.put(getInputCloud(str), new SubscriptionUsage(any));
            }
        } catch (EventCloudIdNotManaged e) {
            this.logger.error("Incurred unknown event cloud {}.", str);
        }
        return any;
    }

    public void unsubscribe(String str, Subscription subscription) {
        try {
            if (this.subscriptions.containsKey(getInputCloud(str))) {
                this.subscriptions.get(getInputCloud(str)).usage--;
                if (this.subscriptions.get(getInputCloud(str)).usage == 0) {
                    this.logger.info("Unsubscribing from eventcloud {}.", str);
                    getInputCloud(str).unsubscribe(subscription.getId());
                    this.subscriptions.remove(getInputCloud(str));
                    this.inputClouds.remove(str);
                } else {
                    this.logger.info("Still subscribed to eventcloud {}.", str);
                }
            }
        } catch (EventCloudIdNotManaged e) {
            this.logger.error("Incurred unknown event cloud {}.", str);
        }
    }

    public void destroy() {
        this.logger.info("Unsubscribe from Event Clouds");
        for (SubscribeApi subscribeApi : this.subscriptions.keySet()) {
            subscribeApi.unsubscribe(this.subscriptions.get(subscribeApi).sub.getId());
        }
        this.subscriptions.clear();
        this.inputClouds.clear();
        this.outputClouds.clear();
        this.init = false;
    }
}
