package reactor.io.net.impl.zmq;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.processor.CancelException;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannel;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/io/net/impl/zmq/ZeroMQChannelStream.class */
public class ZeroMQChannelStream<IN, OUT> extends ChannelStream<IN, OUT> {
    private final ZeroMQConsumerSpec eventSpec;
    private final InetSocketAddress remoteAddress;
    private volatile String connectionId;
    private volatile ZMQ.Socket socket;
    private Subscriber<? super IN> inputSub;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/io/net/impl/zmq/ZeroMQChannelStream$ZeroMQConsumerSpec.class */
    public static class ZeroMQConsumerSpec implements ReactorChannel.ConsumerSpec {
        final List<Consumer<Void>> closeHandlers;

        private ZeroMQConsumerSpec() {
            this.closeHandlers = new ArrayList();
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec close(Consumer<Void> consumer) {
            synchronized (this.closeHandlers) {
                this.closeHandlers.add(consumer);
            }
            return this;
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec readIdle(long j, Consumer<Void> consumer) {
            return this;
        }

        @Override // reactor.io.net.ReactorChannel.ConsumerSpec
        public ReactorChannel.ConsumerSpec writeIdle(long j, Consumer<Void> consumer) {
            return this;
        }
    }

    public ZeroMQChannelStream(Environment environment, long j, Dispatcher dispatcher, InetSocketAddress inetSocketAddress, Codec<Buffer, IN, OUT> codec) {
        super(environment, codec, j, dispatcher);
        this.eventSpec = new ZeroMQConsumerSpec();
        this.remoteAddress = inetSocketAddress;
    }

    @Override // reactor.io.net.ChannelStream
    protected void doSubscribeWriter(Publisher<? extends OUT> publisher, final Subscriber<? super Void> subscriber) {
        publisher.subscribe(new DefaultSubscriber<OUT>() { // from class: reactor.io.net.impl.zmq.ZeroMQChannelStream.1
            ZMsg currentMsg;

            public void onSubscribe(final Subscription subscription) {
                ZeroMQChannelStream.this.eventSpec.close(new Consumer<Void>() { // from class: reactor.io.net.impl.zmq.ZeroMQChannelStream.1.1
                    public void accept(Void r3) {
                        subscription.cancel();
                    }
                });
                subscription.request(Long.MAX_VALUE);
                subscriber.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
            }

            public void onNext(OUT out) {
                ByteBuffer byteBuffer;
                boolean z;
                if (Buffer.class.isAssignableFrom(out.getClass())) {
                    byteBuffer = ((Buffer) out).byteBuffer();
                } else {
                    if (ZeroMQChannelStream.this.getEncoder() == null) {
                        subscriber.onError(Exceptions.addValueAsLastCause(new IllegalArgumentException("Data cannot be encoded"), out));
                        return;
                    }
                    byteBuffer = ((Buffer) ZeroMQChannelStream.this.getEncoder().apply(out)).byteBuffer();
                }
                byte[] bArr = new byte[byteBuffer.remaining()];
                byteBuffer.get(bArr);
                ZMsg zMsg = this.currentMsg;
                this.currentMsg = new ZMsg();
                if (zMsg == null) {
                    zMsg = this.currentMsg;
                    z = true;
                } else {
                    z = false;
                }
                if (z) {
                    switch (ZeroMQChannelStream.this.socket.getType()) {
                        case 6:
                            zMsg.add(new ZFrame(ZeroMQChannelStream.this.connectionId));
                            break;
                    }
                }
                zMsg.add(new ZFrame(bArr));
            }

            public void onError(Throwable th) {
                ZeroMQChannelStream.this.doFlush(this.currentMsg, null);
                subscriber.onError(th);
            }

            public void onComplete() {
                ZeroMQChannelStream.this.doFlush(this.currentMsg, subscriber);
            }
        });
    }

    @Override // reactor.io.net.ChannelStream
    public void doDecoded(IN in) {
        try {
            if (this.inputSub != null) {
                this.inputSub.onNext(in);
            }
        } catch (CancelException e) {
        }
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        if (subscriber == null) {
            throw new IllegalStateException("Input Subscriber cannot be null");
        }
        synchronized (this) {
            if (this.inputSub != null) {
                return;
            }
            this.inputSub = subscriber;
            this.inputSub.onSubscribe(new PushSubscription((Stream) null, this.inputSub));
        }
    }

    public ZeroMQChannelStream<IN, OUT> setConnectionId(String str) {
        this.connectionId = str;
        return this;
    }

    public ZeroMQChannelStream<IN, OUT> setSocket(ZMQ.Socket socket) {
        this.socket = socket;
        return this;
    }

    @Override // reactor.io.net.ReactorChannel
    public InetSocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doFlush(ZMsg zMsg, Subscriber<? super Void> subscriber) {
        if (null != zMsg) {
            boolean send = zMsg.send(this.socket);
            if (null != subscriber) {
                if (send) {
                    subscriber.onComplete();
                } else {
                    subscriber.onError(new RuntimeException("ZeroMQ Message could not be sent"));
                }
            }
        }
    }

    public void close() {
        getDispatcher().dispatch((Object) null, new Consumer<Void>() { // from class: reactor.io.net.impl.zmq.ZeroMQChannelStream.2
            public void accept(Void r5) {
                ArrayList arrayList;
                try {
                    synchronized (ZeroMQChannelStream.this.eventSpec.closeHandlers) {
                        arrayList = new ArrayList(ZeroMQChannelStream.this.eventSpec.closeHandlers);
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((Consumer) it.next()).accept((Object) null);
                    }
                } catch (Throwable th) {
                    if (ZeroMQChannelStream.this.inputSub != null) {
                        ZeroMQChannelStream.this.inputSub.onError(th);
                    }
                }
            }
        }, (Consumer) null);
    }

    @Override // reactor.io.net.ReactorChannel
    public ReactorChannel.ConsumerSpec on() {
        return this.eventSpec;
    }

    @Override // reactor.io.net.ChannelStream
    public ZMQ.Socket delegate() {
        return this.socket;
    }

    public String toString() {
        return "ZeroMQNetChannel{closeHandlers=" + this.eventSpec.closeHandlers + ", connectionId='" + this.connectionId + "', socket=" + this.socket + '}';
    }
}
