package ningyuan.pan.eventstream;

import java.io.Serializable;
import ningyuan.pan.util.Buffer;
import play.Logger;
import play.core.enhancers.PropertiesEnhancer;
import play.libs.F;
import play.mvc.Results;

@PropertiesEnhancer.GeneratedAccessor
@PropertiesEnhancer.RewrittenAccessor
/* loaded from: input_file:ningyuan/pan/eventstream/HTTPBufferStream.class */
public class HTTPBufferStream extends Results.StringChunks implements HTTPStreamObserver, Serializable {
    private static final long serialVersionUID = 1;
    private Buffer msgBuffer;
    private volatile boolean disconnected;
    private final String id;
    private final String RDFSyntax;
    private final EventStreamManager manager;
    private final Thread writingt;

    public HTTPBufferStream(String str, String str2, String str3, EventStreamManager eventStreamManager) {
        super(str3);
        this.msgBuffer = new Buffer(1024);
        this.disconnected = false;
        this.id = str;
        this.RDFSyntax = str2;
        this.manager = eventStreamManager;
        this.manager.addObserver(this);
        this.writingt = Thread.currentThread();
    }

    public void onReady(Results.Chunks.Out<String> out) {
        out.onDisconnected(new F.Callback0() { // from class: ningyuan.pan.eventstream.HTTPBufferStream.1
            public void invoke() throws Throwable {
                HTTPBufferStream.this.disconnected = true;
                HTTPBufferStream.this.writingt.interrupt();
            }
        });
        while (!this.disconnected) {
            System.out.println("<" + this.id + "> EChunks await: ");
            try {
                EventRDFSyntaxTranslator eventRDFSyntaxTranslator = (EventRDFSyntaxTranslator) this.msgBuffer.get();
                Logger.info("Buffer " + this.msgBuffer.getSize());
                out.write(String.valueOf(eventRDFSyntaxTranslator.getEventInSyntax(this.RDFSyntax)) + "\n");
                System.out.println("<" + this.id + "> EChunks write: ");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.manager.removeObserver(this);
        out.close();
    }

    @Override // ningyuan.pan.eventstream.HTTPStreamObserver
    public void update(EventRDFSyntaxTranslator eventRDFSyntaxTranslator) {
        try {
            this.msgBuffer.add(eventRDFSyntaxTranslator);
            Logger.info("Buffer " + this.msgBuffer.getSize());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("<" + this.id + "> EChunks update: ");
    }
}
