/*
 * Decompiled with CFR 0.152.
 */
package net.jxta.impl.pipe;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.WeakHashMap;
import net.jxta.document.MimeMediaType;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.EndpointAddress;
import net.jxta.endpoint.EndpointListener;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.TextDocumentMessageElement;
import net.jxta.id.ID;
import net.jxta.impl.cm.SrdiIndex;
import net.jxta.impl.id.UUID.UUID;
import net.jxta.impl.id.UUID.UUIDFactory;
import net.jxta.impl.pipe.InputPipeImpl;
import net.jxta.impl.pipe.NonBlockingWireOutputPipe;
import net.jxta.impl.pipe.PipeRegistrar;
import net.jxta.impl.pipe.PipeResolver;
import net.jxta.impl.pipe.WireHeader;
import net.jxta.impl.pipe.WirePipeImpl;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.InputPipe;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.rendezvous.RendezVousService;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;

public class WirePipe
implements EndpointListener,
InputPipe,
PipeRegistrar {
    private static final transient Logger LOG = Logger.getLogger((String)WirePipe.class.getName());
    private static final int MAX_RECORDED_MSGIDS = 250;
    private volatile boolean closed = false;
    private PeerGroup myGroup = null;
    private PipeResolver pipeResolver = null;
    private WirePipeImpl pipeService = null;
    private PipeAdvertisement pipeAdv = null;
    private RendezVousService rendezvous = null;
    private final String localPeerId;
    private NonBlockingWireOutputPipe repropagater = null;
    private Map wireinputpipes = new WeakHashMap();
    private int nbInputPipes = 0;
    private List msgIds = new ArrayList(250);

    public WirePipe(PeerGroup g, PipeResolver pipeResolver, WirePipeImpl pipeService, PipeAdvertisement adv) {
        this.myGroup = g;
        this.pipeResolver = pipeResolver;
        this.pipeService = pipeService;
        this.pipeAdv = adv;
        this.localPeerId = this.myGroup.getPeerID().toString();
        this.rendezvous = g.getRendezVousService();
        pipeResolver.register(this);
        this.repropagater = pipeService.createOutputPipe(adv, Collections.EMPTY_SET);
    }

    protected synchronized void finalize() {
        if (!this.closed && LOG.isEnabledFor((Priority)Level.WARN)) {
            LOG.warn((Object)"Pipe is being finalized without being previously closed. This is likely a bug.");
        }
        this.close();
    }

    public synchronized boolean register(InputPipe wireinputpipe) {
        this.wireinputpipes.put(wireinputpipe, null);
        ++this.nbInputPipes;
        if (1 == this.nbInputPipes) {
            boolean registered;
            if (LOG.isEnabledFor((Priority)Level.INFO)) {
                LOG.info((Object)"Registering wire pipe with SRDI");
            }
            if (!(registered = this.myGroup.getEndpointService().addIncomingMessageListener((EndpointListener)((Object)wireinputpipe), "PipeService", wireinputpipe.getPipeID().toString())) && LOG.isEnabledFor((Priority)Level.WARN)) {
                LOG.warn((Object)("Existing Registered Endpoint Listener for " + wireinputpipe.getPipeID()));
            }
            this.pipeResolver.pushSrdi(this, true);
        }
        return true;
    }

    public synchronized boolean forget(InputPipe wireinputpipe) {
        this.wireinputpipes.remove(wireinputpipe);
        --this.nbInputPipes;
        if (0 == this.nbInputPipes) {
            if (LOG.isEnabledFor((Priority)Level.INFO)) {
                LOG.info((Object)"Deregistering wire pipe with SRDI");
            }
            this.pipeResolver.pushSrdi(this, false);
            EndpointListener removed = this.myGroup.getEndpointService().removeIncomingMessageListener("PipeService", wireinputpipe.getPipeID().toString());
            if ((null == removed || this != removed) && LOG.isEnabledFor((Priority)Level.WARN)) {
                LOG.warn((Object)"removeIncomingMessageListener() did not remove correct pipe!");
            }
        }
        if (this.nbInputPipes < 0) {
            this.nbInputPipes = 0;
            if (LOG.isEnabledFor((Priority)Level.WARN)) {
                LOG.warn((Object)"Number of pipe listeners was < 0");
            }
        }
        return true;
    }

    public Message waitForMessage() throws InterruptedException {
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)"This method is not really supported.");
        }
        return null;
    }

    public Message poll(int timeout) throws InterruptedException {
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)"This method is not really supported.");
        }
        return null;
    }

    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.pipeResolver.forget(this);
        this.repropagater.close();
        this.closed = true;
    }

    public String getType() {
        return this.pipeAdv.getType();
    }

    public ID getPipeID() {
        return this.pipeAdv.getPipeID();
    }

    public String getName() {
        return this.pipeAdv.getName();
    }

    public PipeAdvertisement getAdvertisement() {
        return this.pipeAdv;
    }

    public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
        WireHeader header;
        MessageElement elem = message.getMessageElement("jxta", "JxtaWireHeader");
        if (null == elem) {
            if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                LOG.debug((Object)("No JxtaWireHeader element. Discarding " + message));
            }
            return;
        }
        try {
            XMLDocument doc = (XMLDocument)StructuredDocumentFactory.newStructuredDocument(elem.getMimeType(), elem.getStream());
            header = new WireHeader(doc);
        }
        catch (Exception e) {
            if (LOG.isEnabledFor((Priority)Level.WARN)) {
                LOG.warn((Object)"bad wire header", (Throwable)e);
            }
            return;
        }
        this.processIncomingMessage(message, header, srcAddr, dstAddr);
    }

    void processIncomingMessage(Message message, WireHeader header, EndpointAddress srcAddr, EndpointAddress dstAddr) {
        if (header.containsPeer(this.localPeerId)) {
            if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                LOG.debug((Object)("Loopback detected - discarding " + message));
            }
            return;
        }
        if (this.recordSeenMessage(header.getMsgId())) {
            if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                LOG.debug((Object)("Discarding duplicate " + message));
            }
            return;
        }
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)("Processing " + message + " on " + this.pipeAdv.getPipeID()));
        }
        if (this.myGroup.isRendezvous()) {
            this.repropagate(message, header);
        } else {
            this.callLocalListeners(message, srcAddr, dstAddr);
        }
    }

    private void callLocalListeners(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {
        srcAddr = null == srcAddr ? null : EndpointAddress.unmodifiableEndpointAddress(srcAddr);
        dstAddr = null == dstAddr ? null : EndpointAddress.unmodifiableEndpointAddress(dstAddr);
        Iterator<InputPipe> eachInput = Arrays.asList(this.wireinputpipes.keySet().toArray(new InputPipe[0])).iterator();
        while (eachInput.hasNext()) {
            InputPipeImpl anInputPipe = (InputPipeImpl)eachInput.next();
            Message tmpMsg = (Message)message.clone();
            try {
                anInputPipe.processIncomingMessage(tmpMsg, srcAddr, dstAddr);
            }
            catch (Throwable ignored) {
                if (!LOG.isEnabledFor((Priority)Level.ERROR)) continue;
                LOG.error((Object)("Uncaught Throwable during callback (" + anInputPipe + ") for " + anInputPipe.getPipeID()), ignored);
            }
        }
    }

    void repropagate(Message message, WireHeader header) {
        block6: {
            if (header.getTTL() <= 1) {
                if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                    LOG.debug((Object)("No TTL remaining - discarding " + message + " on " + header.getPipeID()));
                }
                return;
            }
            Message msg = (Message)message.clone();
            header.setTTL(header.getTTL() - 1);
            header.addPeer(this.localPeerId);
            XMLDocument headerDoc = (XMLDocument)header.getDocument(MimeMediaType.XMLUTF8);
            TextDocumentMessageElement elem = new TextDocumentMessageElement("JxtaWireHeader", headerDoc, null);
            msg.replaceMessageElement("jxta", elem);
            if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                LOG.debug((Object)("Repropagating " + msg + " on " + header.getPipeID()));
            }
            try {
                if (!this.repropagater.enqueue(msg) && LOG.isEnabledFor((Priority)Level.WARN)) {
                    LOG.warn((Object)("Failure repropagating " + msg + " on " + header.getPipeID() + ". Could not queue message."));
                }
            }
            catch (IOException failed) {
                if (!LOG.isEnabledFor((Priority)Level.WARN)) break block6;
                LOG.warn((Object)("Failure repropagating " + msg + " on " + header.getPipeID()), (Throwable)failed);
            }
        }
    }

    void sendMessage(Message msg, Set peers) throws IOException {
        if (peers.isEmpty() || peers.contains(this.myGroup.getPeerID())) {
            this.callLocalListeners(msg, null, null);
        }
        if (peers.isEmpty()) {
            if (this.myGroup.isRendezvous()) {
                SrdiIndex srdiIndex = this.pipeResolver.getSrdiIndex();
                Vector peerids = srdiIndex.query("JxtaPropagate", "Id", this.getPipeID().toString(), Integer.MAX_VALUE);
                peerids.retainAll(this.rendezvous.getConnectedPeerIDs());
                this.rendezvous.propagate(Collections.enumeration(peerids), (Message)msg.clone(), "jxta.service.wirepipe", this.pipeService.getServiceParameter(), 1);
                this.rendezvous.walk(msg, "jxta.service.wirepipe", this.pipeService.getServiceParameter(), Integer.MAX_VALUE);
            } else {
                this.rendezvous.propagate(msg, "jxta.service.wirepipe", this.pipeService.getServiceParameter(), Integer.MAX_VALUE);
            }
        } else {
            if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                LOG.debug((Object)("Propagating " + msg + " to " + peers.size() + " peers."));
            }
            this.rendezvous.propagate(Collections.enumeration(peers), msg, "jxta.service.wirepipe", this.pipeService.getServiceParameter(), 1);
        }
    }

    String createMsgId() {
        return UUIDFactory.newSeqUUID().toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean recordSeenMessage(String id) {
        UUID msgid = null;
        try {
            msgid = new UUID(id);
        }
        catch (IllegalArgumentException notauuid) {
            try {
                msgid = UUIDFactory.newHashUUID(Long.parseLong(id), 0L);
            }
            catch (NumberFormatException notanumber) {
                msgid = UUIDFactory.newHashUUID(id.hashCode(), 0L);
            }
        }
        List list = this.msgIds;
        synchronized (list) {
            if (this.msgIds.contains(msgid)) {
                if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                    LOG.debug((Object)("duplicate " + msgid));
                }
                return true;
            }
            if (this.msgIds.size() >= 250) {
                if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                    LOG.debug((Object)"Remove oldest id");
                }
                this.msgIds.remove(0);
            }
            this.msgIds.add(msgid);
        }
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)("added " + msgid));
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkMessageSeen(String id) {
        boolean found;
        UUID msgid = null;
        try {
            msgid = new UUID(id);
        }
        catch (IllegalArgumentException notauuid) {
            try {
                msgid = UUIDFactory.newHashUUID(Long.parseLong(id), 0L);
            }
            catch (NumberFormatException notanumber) {
                msgid = UUIDFactory.newHashUUID(id.hashCode(), 0L);
            }
        }
        List list = this.msgIds;
        synchronized (list) {
            found = this.msgIds.contains(msgid);
        }
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)("id='" + msgid + "' (" + found + ")"));
        }
        return found;
    }
}

