package reactor.io.net.impl.zmq;

import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Channel;
import reactor.io.net.ChannelStream;
import reactor.io.net.PeerStream;

/* loaded from: input_file:reactor/io/net/impl/zmq/ZeroMQChannelStream.class */
public class ZeroMQChannelStream<IN, OUT> extends ChannelStream<IN, OUT> {
    private final ZeroMQChannelStream<IN, OUT>.ZeroMQConsumerSpec eventSpec;
    private final MutableList<Consumer<Void>> closeHandlers;
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private ZMsg currentMsg;

    /* loaded from: input_file:reactor/io/net/impl/zmq/ZeroMQChannelStream$ZeroMQConsumerSpec.class */
    private class ZeroMQConsumerSpec implements Channel.ConsumerSpec {
        private ZeroMQConsumerSpec() {
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec close(Consumer<Void> consumer) {
            ZeroMQChannelStream.this.closeHandlers.add(consumer);
            return this;
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec readIdle(long j, Consumer<Void> consumer) {
            return this;
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec writeIdle(long j, Consumer<Void> consumer) {
            return this;
        }
    }

    public ZeroMQChannelStream(@Nonnull Environment environment, long j, PeerStream<IN, OUT, ChannelStream<IN, OUT>> peerStream, @Nonnull Dispatcher dispatcher, @Nonnull Dispatcher dispatcher2, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(environment, codec, j, peerStream, dispatcher2, dispatcher);
        this.eventSpec = new ZeroMQConsumerSpec();
        this.closeHandlers = SynchronizedMutableList.of(FastList.newList());
    }

    public ZeroMQChannelStream<IN, OUT> setConnectionId(String str) {
        this.connectionId = str;
        return this;
    }

    public ZeroMQChannelStream<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override // reactor.io.net.Channel
    public InetSocketAddress remoteAddress() {
        return null;
    }

    @Override // reactor.io.net.ChannelStream
    protected void write(ByteBuffer byteBuffer, Subscriber<?> subscriber, boolean z) {
        ZMsg zMsg;
        boolean z2;
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        synchronized (this) {
            zMsg = this.currentMsg;
            this.currentMsg = new ZMsg();
            if (zMsg == null) {
                zMsg = this.currentMsg;
                z2 = true;
            } else {
                z2 = false;
            }
        }
        if (z2) {
            switch (this.socket.getType()) {
                case 6:
                    zMsg.add(new ZFrame(this.connectionId));
                    break;
            }
        }
        zMsg.add(new ZFrame(bArr));
        if (z) {
            doFlush(subscriber);
        }
    }

    @Override // reactor.io.net.ChannelStream
    protected void write(Buffer buffer, Subscriber<?> subscriber, boolean z) {
        write(buffer.byteBuffer(), subscriber, z);
    }

    @Override // reactor.io.net.ChannelStream
    protected void write(Object obj, Subscriber<?> subscriber, boolean z) {
        write(((Buffer) getEncoder().apply(obj)).byteBuffer(), subscriber, z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.net.ChannelStream
    public synchronized void flush() {
        doFlush(null);
    }

    private void doFlush(Subscriber<?> subscriber) {
        ZMsg zMsg;
        synchronized (this) {
            zMsg = this.currentMsg;
            this.currentMsg = null;
        }
        if (null != zMsg) {
            boolean send = zMsg.send(this.socket);
            if (null != subscriber) {
                if (send) {
                    subscriber.onComplete();
                } else {
                    subscriber.onError(new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    public void close() {
        getDispatcher().dispatch((Object) null, new Consumer<Void>() { // from class: reactor.io.net.impl.zmq.ZeroMQChannelStream.1
            public void accept(Void r6) {
                ZeroMQChannelStream.this.closeHandlers.removeIf(new CheckedPredicate<Consumer<Void>>() { // from class: reactor.io.net.impl.zmq.ZeroMQChannelStream.1.1
                    public boolean safeAccept(Consumer<Void> consumer) throws Exception {
                        consumer.accept((Object) null);
                        return true;
                    }
                });
            }
        }, (Consumer) null);
    }

    @Override // reactor.io.net.Channel
    public Channel.ConsumerSpec on() {
        return this.eventSpec;
    }

    @Override // reactor.io.net.ChannelStream
    public ZMQ.Socket delegate() {
        return this.socket;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.closeHandlers + ", connectionId='" + this.connectionId + "', socket=" + this.socket + '}';
    }
}
