package ningyuan.pan.eventstream;

import fr.inria.eventcloud.adapters.rdf2go.SubscribeRdf2goAdapter;
import fr.inria.eventcloud.api.Subscription;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import ningyuan.pan.util.Buffer;
import org.ontoware.rdf2go.model.Model;
import play.Logger;
import play.core.enhancers.PropertiesEnhancer;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
/* loaded from: input_file:ningyuan/pan/eventstream/EventStreamManager.class */
public class EventStreamManager implements Serializable, Runnable {
    private static final long serialVersionUID = 1;
    private Subscription sub;
    private String query;
    private final String streamName;
    private final SubscribeRdf2goAdapter subscribeProxy;
    private final EventStreamListener listener;
    private Thread thread;
    private volatile boolean connected = false;
    private List<HTTPStreamObserver> observers = new ArrayList();
    private final ReentrantLock lock = new ReentrantLock();
    private Buffer buffer = new Buffer(2048);

    public EventStreamManager(SubscribeRdf2goAdapter subscribeRdf2goAdapter, String str, String str2) {
        this.subscribeProxy = subscribeRdf2goAdapter;
        this.query = str;
        this.streamName = new String(str2);
        this.listener = new EventStreamListener(this.streamName);
    }

    public boolean isConnected() {
        return this.connected;
    }

    public String getQueryContent() {
        return this.query;
    }

    public void setQueryContent(String str) {
        if (str == null) {
            throw new IllegalArgumentException();
        }
        this.query = str;
    }

    @Override // java.lang.Runnable
    public void run() {
    }

    public void notifyObserver(Model model) {
        EventRDFSyntaxTranslator eventRDFSyntaxTranslator = new EventRDFSyntaxTranslator(model);
        Iterator<HTTPStreamObserver> it = this.observers.iterator();
        while (it.hasNext()) {
            it.next().update(eventRDFSyntaxTranslator);
        }
    }

    public void addObserver(HTTPStreamObserver hTTPStreamObserver) {
        try {
            this.lock.lock();
            Logger.info("Add HTTPStream");
            if (this.observers.isEmpty() || !this.connected) {
                this.observers.add(hTTPStreamObserver);
                subscribe();
            } else {
                this.observers.add(hTTPStreamObserver);
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void removeObserver(HTTPStreamObserver hTTPStreamObserver) {
        try {
            this.lock.lock();
            this.observers.remove(hTTPStreamObserver);
            Logger.info("Remove HTTPStream");
            if (this.observers.isEmpty() && this.connected) {
                unsubscribe();
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void disconnect() {
        try {
            this.lock.lock();
            if (this.connected) {
                unsubscribe();
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void subscribe() {
        this.connected = true;
        this.sub = new Subscription(this.query);
        this.subscribeProxy.subscribe(this.sub, this.listener);
        Logger.info("Subscribed");
    }

    private void unsubscribe() {
        this.subscribeProxy.unsubscribe(this.sub.getId());
        this.connected = false;
        Logger.info("Unsubscribed");
    }
}
