package org.objectweb.proactive.extensions.amqp.remoteobject;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.objectweb.proactive.core.ProActiveException;
import org.objectweb.proactive.core.body.future.MethodCallResult;
import org.objectweb.proactive.core.body.request.Request;
import org.objectweb.proactive.core.remoteobject.InternalRemoteRemoteObject;
import org.objectweb.proactive.core.remoteobject.SynchronousReplyImpl;
import org.objectweb.proactive.core.util.URIBuilder;
import org.objectweb.proactive.core.util.converter.ByteToObjectConverter;
import org.objectweb.proactive.core.util.converter.ObjectToByteConverter;
import org.objectweb.proactive.core.util.log.ProActiveLogger;
import org.objectweb.proactive.extensions.amqp.AMQPConfig;
import org.objectweb.proactive.utils.NamedThreadFactory;
import org.objectweb.proactive.utils.ThreadPools;

/* loaded from: input_file:WEB-INF/lib/proactive-programming-bundle-5.2.0-update-10.jar:org/objectweb/proactive/extensions/amqp/remoteobject/AMQPRemoteObjectServer.class */
public class AMQPRemoteObjectServer {
    InternalRemoteRemoteObject rro;
    String queueName;
    private static final Logger logger = ProActiveLogger.getLogger(AMQPConfig.Loggers.AMQP_REMOTE_OBJECT);
    static final ThreadPoolExecutor tpe = ThreadPools.newCachedThreadPool(1, TimeUnit.SECONDS, new NamedThreadFactory("AMQP Consumer Thread ", true));
    Connection connection = null;
    Channel channel = null;
    QueueingConsumer consumer = null;
    String consumerTag = null;
    AMQP.Queue.BindOk queueBind = null;
    AMQP.Queue.DeclareOk ok = null;

    public AMQPRemoteObjectServer(InternalRemoteRemoteObject internalRemoteRemoteObject) throws ProActiveException, IOException {
        this.rro = null;
        this.queueName = null;
        this.rro = internalRemoteRemoteObject;
        this.queueName = AMQPUtils.computeQueueNameFromName(URIBuilder.getNameFromURI(internalRemoteRemoteObject.getURI()));
    }

    public void connect(boolean z) throws IOException, ProActiveException {
        this.channel = AMQPUtils.getChannelToBroker(this.rro.getURI());
        try {
            this.ok = this.channel.queueDeclare(this.queueName, false, false, true, (Map) null);
            logger.debug(String.format("declared queue %s,response %s", this.queueName, this.ok.toString()));
            this.queueBind = this.channel.queueBind(this.queueName, AMQPConfig.PA_AMQP_FACTORY_EXCHANGE_NAME.getValue(), "");
            this.consumerTag = this.channel.basicConsume(this.queueName, true, new DefaultConsumer(this.channel) { // from class: org.objectweb.proactive.extensions.amqp.remoteobject.AMQPRemoteObjectServer.1
                public void handleDelivery(String str, final Envelope envelope, final AMQP.BasicProperties basicProperties, final byte[] bArr) throws IOException {
                    AMQPRemoteObjectServer.tpe.execute(new Runnable() { // from class: org.objectweb.proactive.extensions.amqp.remoteobject.AMQPRemoteObjectServer.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            byte[] convert;
                            envelope.getRoutingKey();
                            byte[] bArr2 = null;
                            envelope.getDeliveryTag();
                            AMQP.BasicProperties basicProperties2 = basicProperties;
                            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(basicProperties2.getCorrelationId()).build();
                            try {
                                try {
                                    if (AMQPConfig.PA_AMQP_DISCOVERY_QUEUES_MESSAGE_TYPE.getValue().equals(basicProperties2.getType())) {
                                        convert = AMQPRemoteObjectServer.this.rro.getURI().toString().getBytes();
                                    } else {
                                        Request request = (Request) ByteToObjectConverter.ProActiveObjectStream.convert(bArr);
                                        AMQPRemoteObjectServer.logger.debug(String.format("message %s consumed on queue %s", request.getMethodName(), AMQPRemoteObjectServer.this.queueName));
                                        convert = ObjectToByteConverter.ProActiveObjectStream.convert(AMQPRemoteObjectServer.this.rro.receiveMessage(request));
                                    }
                                    try {
                                        AMQPRemoteObjectServer.this.channel.basicPublish("", basicProperties2.getReplyTo(), build, convert);
                                    } catch (IOException e) {
                                        e.printStackTrace();
                                    }
                                } catch (Exception e2) {
                                    try {
                                        bArr2 = ObjectToByteConverter.ProActiveObjectStream.convert(new SynchronousReplyImpl(new MethodCallResult(null, e2)));
                                    } catch (IOException e3) {
                                        e3.printStackTrace();
                                    }
                                    try {
                                        AMQPRemoteObjectServer.this.channel.basicPublish("", basicProperties2.getReplyTo(), build, bArr2);
                                    } catch (IOException e4) {
                                        e4.printStackTrace();
                                    }
                                }
                            } catch (Throwable th) {
                                try {
                                    AMQPRemoteObjectServer.this.channel.basicPublish("", basicProperties2.getReplyTo(), build, bArr2);
                                } catch (IOException e5) {
                                    e5.printStackTrace();
                                }
                                throw th;
                            }
                        }
                    });
                }
            });
        } catch (IOException e) {
            unbind();
            throw e;
        }
    }

    private void unbind() throws IOException, ProActiveException {
        if (this.queueBind != null) {
            this.channel.queueUnbind(this.queueName, AMQPConfig.PA_AMQP_FACTORY_EXCHANGE_NAME.getValue(), "");
        }
        if (this.ok != null) {
            this.channel.queueDelete(this.queueName);
        }
    }
}
