/*
 * Decompiled with CFR 0.152.
 */
package org.ow2.petals.se.eip.patterns;

import java.io.InputStream;
import java.util.List;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
import org.ow2.petals.commons.exception.ExceptionUtil;
import org.ow2.petals.component.framework.api.exception.PEtALSCDKException;
import org.ow2.petals.component.framework.api.message.Exchange;
import org.ow2.petals.component.framework.jbidescriptor.generated.Consumes;
import org.ow2.petals.component.framework.util.ExchangeUtil;
import org.ow2.petals.se.eip.ExchangeContext;
import org.ow2.petals.se.eip.async.CommonAsyncContext;
import org.ow2.petals.se.eip.patterns.AbstractAggregatorPattern;
import org.ow2.petals.se.eip.patterns.PatternHelper;

public class ScatterGather
extends AbstractAggregatorPattern {
    private static final String SCATTER_GATHER_NAMESPACE = "http://petals.ow2.org/components/eip/version-2/scatter-gather";

    @Override
    public String getName() {
        return "Scatter-Gather";
    }

    @Override
    public String getNameSpace() {
        return SCATTER_GATHER_NAMESPACE;
    }

    @Override
    public void init() {
    }

    @Override
    public boolean process(Exchange exchange, ExchangeContext context) {
        boolean reply = false;
        try {
            List<Consumes> consumesList = context.getSUConsumes(exchange.getEndpoint());
            if (consumesList != null && consumesList.size() != 0) {
                this.sendAsyncExchanges(consumesList, exchange, context);
            } else {
                exchange.setError((Exception)((Object)new MessagingException(this.getName() + ": the Service Unit must define at least 1 Consumes section(s)")));
                reply = true;
            }
        }
        catch (MessagingException e) {
            exchange.setError((Exception)((Object)e));
            reply = true;
        }
        catch (PEtALSCDKException e) {
            exchange.setError((Exception)((Object)new MessagingException(ExceptionUtil.getExtendedMessage((Exception)((Object)e)))));
            reply = true;
        }
        return reply;
    }

    private void sendAsyncExchanges(List<Consumes> consumesList, Exchange exchange, ExchangeContext context) throws MessagingException, PEtALSCDKException {
        boolean faultRobust = this.isFaultRobust(context.getExtensions());
        boolean exceptionRobust = this.isExceptionRobust(context.getExtensions());
        CommonAsyncContext asyncContext = new CommonAsyncContext(exchange, 600000L, this.getName(), consumesList.size(), faultRobust, exceptionRobust, false);
        NormalizedMessage inMessage = exchange.getInMessage();
        InputStream inMessageAsStream = PatternHelper.handleStreamSource(inMessage);
        for (Consumes consumedService : consumesList) {
            Exchange tmpExchange = context.createConsumeExchange(consumedService);
            if (tmpExchange.getOperation() == null) {
                tmpExchange.setOperation(asyncContext.getOriginalExchange().getOperation());
            }
            PatternHelper.copy(inMessage, tmpExchange.getInMessage());
            ExchangeUtil.copyExchangeProperties((Exchange)exchange, (Exchange)tmpExchange);
            this.logSend(consumedService, tmpExchange);
            context.sendAsync(tmpExchange, asyncContext);
            if (inMessageAsStream == null) continue;
            inMessage.setContent((Source)new StreamSource(inMessageAsStream));
            inMessageAsStream = PatternHelper.handleStreamSource(inMessage);
        }
    }
}

