package reactor.io.net.impl.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;
import reactor.rx.stream.GroupedStream;

/* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQTcpServer.class */
public class ZeroMQTcpServer<IN, OUT> extends TcpServer<IN, OUT> {
    private final Logger log;
    private final int ioThreadCount;
    private final ZeroMQServerSocketOptions zmqOpts;
    private final ExecutorService threadPool;
    private volatile ZeroMQWorker worker;
    private volatile Future<?> workerFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer$2, reason: invalid class name */
    /* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQTcpServer$2.class */
    public class AnonymousClass2 extends ZeroMQWorker {
        final /* synthetic */ Stream val$grouped;
        final /* synthetic */ Promise val$promise;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass2(UUID uuid, int i, int i2, ZContext zContext, Broadcaster broadcaster, Stream stream, Promise promise) {
            super(uuid, i, i2, zContext, broadcaster);
            this.val$grouped = stream;
            this.val$promise = promise;
        }

        @Override // reactor.io.net.impl.zmq.ZeroMQWorker
        protected void configure(ZMQ.Socket socket) {
            socket.setReceiveBufferSize(ZeroMQTcpServer.this.getOptions().rcvbuf());
            socket.setSendBufferSize(ZeroMQTcpServer.this.getOptions().sndbuf());
            socket.setBacklog(ZeroMQTcpServer.this.getOptions().backlog());
            if (ZeroMQTcpServer.this.getOptions().keepAlive()) {
                socket.setTCPKeepAlive(1);
            }
            if (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.socketConfigurer()) {
                return;
            }
            ZeroMQTcpServer.this.zmqOpts.socketConfigurer().accept(socket);
        }

        @Override // reactor.io.net.impl.zmq.ZeroMQWorker
        protected void start(final ZMQ.Socket socket) {
            try {
                String listenAddresses = (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.listenAddresses()) ? "tcp://" + ZeroMQTcpServer.this.getListenAddress().getHostString() + ":" + ZeroMQTcpServer.this.getListenAddress().getPort() : ZeroMQTcpServer.this.zmqOpts.listenAddresses();
                if (ZeroMQTcpServer.this.log.isInfoEnabled()) {
                    ZeroMQTcpServer.this.log.info("BIND: starting ZeroMQ {} socket on {}", ZeroMQ.findSocketTypeName(socket.getType()), listenAddresses);
                }
                socket.bind(listenAddresses);
                this.val$grouped.consume(new Consumer<GroupedStream<String, ZMsg>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1
                    public void accept(GroupedStream<String, ZMsg> groupedStream) {
                        final ZeroMQChannelStream<IN, OUT> socket2 = ZeroMQTcpServer.this.bindChannel((Object) null, null != ZeroMQTcpServer.this.zmqOpts ? ZeroMQTcpServer.this.zmqOpts.prefetch() : -1L).setConnectionId((String) groupedStream.key()).setSocket(socket);
                        socket2.registerOnPeer();
                        groupedStream.consume(new Consumer<ZMsg>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1.1
                            public void accept(ZMsg zMsg) {
                                while (true) {
                                    ZFrame pop = zMsg.pop();
                                    if (null == pop) {
                                        zMsg.destroy();
                                        return;
                                    } else if (socket2.getDecoder() != null) {
                                        socket2.getDecoder().apply(Buffer.wrap(pop.getData()));
                                    } else {
                                        socket2.in().onNext(Buffer.wrap(pop.getData()));
                                    }
                                }
                            }
                        }, ZeroMQTcpServer.this.createErrorConsumer(socket2), new Consumer<Void>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1.2
                            public void accept(Void r4) {
                                try {
                                    socket2.close();
                                } catch (Throwable th) {
                                    ZeroMQTcpServer.this.notifyError(th);
                                }
                            }
                        });
                    }
                });
                this.val$promise.onNext(true);
            } catch (Exception e) {
                this.val$promise.onError(e);
            }
        }
    }

    public ZeroMQTcpServer(@Nonnull Environment environment, @Nonnull Dispatcher dispatcher, @Nullable InetSocketAddress inetSocketAddress, ServerSocketOptions serverSocketOptions, SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, inetSocketAddress, serverSocketOptions, sslOptions, codec);
        this.log = LoggerFactory.getLogger(getClass());
        this.ioThreadCount = ((Integer) getEnvironment().getProperty("reactor.zmq.ioThreadCount", Integer.class, 1)).intValue();
        if (serverSocketOptions instanceof ZeroMQServerSocketOptions) {
            this.zmqOpts = (ZeroMQServerSocketOptions) serverSocketOptions;
        } else {
            this.zmqOpts = null;
        }
        this.threadPool = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("zmq-server"));
    }

    @Override // reactor.io.net.Server
    public Promise<Boolean> start() {
        Assert.isNull(this.worker, "This ZeroMQ server has already been started");
        Promise<Boolean> ready = Promises.ready(getEnvironment(), getDispatcher());
        final UUID random = UUIDUtils.random();
        final int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 6;
        ZContext context = null != this.zmqOpts ? this.zmqOpts.context() : null;
        Broadcaster create = SerializedBroadcaster.create(getEnvironment());
        this.worker = new AnonymousClass2(random, socketType, this.ioThreadCount, context, create, create.groupBy(new Function<ZMsg, String>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.1
            public String apply(ZMsg zMsg) {
                String uuid;
                switch (socketType) {
                    case 6:
                        uuid = zMsg.popString();
                        break;
                    default:
                        uuid = random.toString();
                        break;
                }
                return uuid;
            }
        }), ready);
        this.workerFuture = this.threadPool.submit(this.worker);
        return ready;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.net.PeerStream
    public ZeroMQChannelStream<IN, OUT> bindChannel(Object obj, long j) {
        return new ZeroMQChannelStream<>(getEnvironment(), j == -1 ? getPrefetchSize() : j, this, getDispatcher(), getDispatcher(), getDefaultCodec());
    }

    @Override // reactor.io.net.Server
    public Promise<Boolean> shutdown() {
        if (null == this.worker) {
            return Promises.error(new IllegalStateException("This ZeroMQ server has not been started"));
        }
        Promise<Boolean> ready = Promises.ready(getEnvironment(), getDispatcher());
        this.worker.shutdown();
        if (!this.workerFuture.isDone()) {
            this.workerFuture.cancel(true);
        }
        this.threadPool.shutdownNow();
        notifyShutdown();
        ready.onNext(true);
        return ready;
    }
}
