/*
 * Decompiled with CFR 0.152.
 */
package net.jxta.impl.pipe;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import net.jxta.document.MimeMediaType;
import net.jxta.document.XMLDocument;
import net.jxta.endpoint.Message;
import net.jxta.endpoint.TextDocumentMessageElement;
import net.jxta.id.ID;
import net.jxta.impl.pipe.WireHeader;
import net.jxta.impl.pipe.WirePipe;
import net.jxta.impl.util.UnbiasedQueue;
import net.jxta.peergroup.PeerGroup;
import net.jxta.pipe.OutputPipe;
import net.jxta.protocol.PipeAdvertisement;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;

class NonBlockingWireOutputPipe
implements OutputPipe,
Runnable {
    private static final Logger LOG = Logger.getLogger((String)NonBlockingWireOutputPipe.class.getName());
    private static final long IDLEWORKERLINGER = 10000L;
    private volatile boolean closed = false;
    private PeerGroup myGroup = null;
    private WirePipe wire = null;
    private PipeAdvertisement pAdv = null;
    private Set destPeers = null;
    private volatile Thread serviceThread = null;
    private UnbiasedQueue queue = UnbiasedQueue.synchronizedQueue(new UnbiasedQueue(50, false));
    private workerState workerstate;

    public NonBlockingWireOutputPipe(PeerGroup g, WirePipe wire, PipeAdvertisement pAdv, Set peers) {
        this.myGroup = g;
        this.wire = wire;
        this.destPeers = new HashSet(peers);
        this.pAdv = pAdv;
        if (LOG.isEnabledFor((Priority)Level.INFO)) {
            LOG.info((Object)("Constructing for " + this.getPipeID()));
        }
        this.workerstate = workerState.SENDMESSAGES;
    }

    protected void finalize() {
        this.close();
    }

    public synchronized void close() {
        if (!this.closed) {
            if (LOG.isEnabledFor((Priority)Level.INFO)) {
                LOG.info((Object)("Closing queue for " + this.getPipeID()));
            }
            this.queue.close();
        }
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public final String getType() {
        return this.pAdv.getType();
    }

    public final ID getPipeID() {
        return this.pAdv.getPipeID();
    }

    public final String getName() {
        return this.pAdv.getName();
    }

    public final PipeAdvertisement getAdvertisement() {
        return this.pAdv;
    }

    public boolean send(Message message) throws IOException {
        WireHeader header = new WireHeader();
        header.setPipeID(this.getPipeID());
        header.setTTL(this.destPeers.isEmpty() ? 200 : 1);
        header.setMsgId(this.wire.createMsgId());
        header.addPeer(this.myGroup.getPeerID().toString());
        XMLDocument asDoc = (XMLDocument)header.getDocument(MimeMediaType.XMLUTF8);
        TextDocumentMessageElement elem = new TextDocumentMessageElement("JxtaWireHeader", asDoc, null);
        Message msg = (Message)message.clone();
        msg.replaceMessageElement("jxta", elem);
        return this.enqueue(msg);
    }

    boolean enqueue(Message message) throws IOException {
        if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
            LOG.debug((Object)("Queuing " + message + " for pipe " + this.getPipeID()));
        }
        boolean pushed = false;
        while (!this.queue.isClosed()) {
            try {
                pushed = this.queue.push(message, 250L);
                break;
            }
            catch (InterruptedException woken) {
                Thread.interrupted();
            }
        }
        if (!pushed && this.queue.isClosed()) {
            IOException failed = new IOException("Could not enqueue " + message + " for sending. Pipe is closed.");
            if (LOG.isEnabledFor((Priority)Level.ERROR)) {
                LOG.error((Object)failed, (Throwable)failed);
            }
            throw failed;
        }
        this.startServiceThread();
        return pushed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void run() {
        block14: while (true) {
            try {
                try {}
                catch (Throwable all) {
                    if (LOG.isEnabledFor((Priority)Level.ERROR)) {
                        LOG.error((Object)("Uncaught Throwable in thread :" + Thread.currentThread().getName()), all);
                    }
                    NonBlockingWireOutputPipe nonBlockingWireOutputPipe = this;
                    synchronized (nonBlockingWireOutputPipe) {
                        if (this.serviceThread == Thread.currentThread()) {
                            this.serviceThread = null;
                        }
                    }
                    Object var6_8 = null;
                    if (!LOG.isEnabledFor((Priority)Level.INFO)) return;
                    LOG.info((Object)("Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + this.workerstate + "\tqueue closed : " + this.queue.isClosed() + "\tnumber in queue : " + this.queue.getCurrentInQueue() + "\tnumber queued : " + this.queue.getNumEnqueued() + "\tnumber dequeued : " + this.queue.getNumDequeued()));
                    return;
                }
            }
            catch (Throwable throwable) {
                Object var6_9 = null;
                if (!LOG.isEnabledFor((Priority)Level.INFO)) throw throwable;
                LOG.info((Object)("Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + this.workerstate + "\tqueue closed : " + this.queue.isClosed() + "\tnumber in queue : " + this.queue.getCurrentInQueue() + "\tnumber queued : " + this.queue.getNumEnqueued() + "\tnumber dequeued : " + this.queue.getNumDequeued()));
                throw throwable;
            }
            while (workerState.CLOSED != this.workerstate) {
                NonBlockingWireOutputPipe nonBlockingWireOutputPipe = this;
                synchronized (nonBlockingWireOutputPipe) {
                    LOG.debug((Object)("NON-BLOCKING WORKER AT STATE : " + this.workerstate));
                    if (workerState.SENDMESSAGES != this.workerstate) {
                        if (workerState.CLOSED == this.workerstate) {
                            this.queue.clear();
                            this.serviceThread = null;
                            break block14;
                        }
                        LOG.warn((Object)("Unrecognized state in worker thread : " + this.workerstate));
                    }
                    if (workerState.SENDMESSAGES != this.workerstate) continue;
                }
                Message msg = null;
                try {
                    msg = (Message)this.queue.pop(10000L);
                }
                catch (InterruptedException woken) {
                    Thread.interrupted();
                    continue;
                }
                if (null == msg) {
                    NonBlockingWireOutputPipe woken = this;
                    synchronized (woken) {
                        if (null == this.queue.peek()) {
                            if (this.closed) {
                                this.workerstate = workerState.CLOSED;
                                continue;
                            }
                            this.serviceThread = null;
                            break block14;
                        }
                        continue;
                    }
                }
                if (LOG.isEnabledFor((Priority)Level.DEBUG)) {
                    LOG.debug((Object)("Sending " + msg + " on " + this.getPipeID()));
                }
                try {
                    this.wire.sendMessage(msg, this.destPeers);
                }
                catch (IOException failed) {
                    if (!LOG.isEnabledFor((Priority)Level.ERROR)) continue;
                    LOG.error((Object)("Failed sending " + msg + " on " + this.getPipeID()), (Throwable)failed);
                    continue;
                }
                continue block14;
            }
            break;
        }
        Object var6_7 = null;
        if (!LOG.isEnabledFor((Priority)Level.INFO)) return;
        LOG.info((Object)("Thread exit : " + Thread.currentThread().getName() + "\n\tworker state : " + this.workerstate + "\tqueue closed : " + this.queue.isClosed() + "\tnumber in queue : " + this.queue.getCurrentInQueue() + "\tnumber queued : " + this.queue.getNumEnqueued() + "\tnumber dequeued : " + this.queue.getNumDequeued()));
    }

    private synchronized void startServiceThread() {
        if (null == this.serviceThread && !this.closed) {
            this.serviceThread = new Thread(this.myGroup.getHomeThreadGroup(), this, "Worker Thread for NonBlockingWireOutputPipe : " + this.getPipeID());
            this.serviceThread.setDaemon(true);
            this.serviceThread.start();
            if (LOG.isEnabledFor((Priority)Level.INFO)) {
                LOG.info((Object)("Thread start : " + this.serviceThread.getName() + "\n\tworker state : " + this.workerstate + "\tqueue closed : " + this.queue.isClosed() + "\tnumber in queue : " + this.queue.getCurrentInQueue() + "\tnumber queued : " + this.queue.getNumEnqueued() + "\tnumber dequeued : " + this.queue.getNumDequeued()));
            }
        }
    }

    static class workerState {
        public static final workerState SENDMESSAGES = new workerState(){

            public String toString() {
                return "SENDMESSAGES";
            }
        };
        public static final workerState CLOSED = new workerState(){

            public String toString() {
                return "CLOSED";
            }
        };

        private workerState() {
        }
    }
}

