package reactor.io.net.impl.zmq.tcp;

import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.function.checked.CheckedFunction0;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.ChannelStream;
import reactor.io.net.Client;
import reactor.io.net.NetStreams;
import reactor.io.net.Server;
import reactor.io.net.Spec;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;

/* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQ.class */
public class ZeroMQ<T> {
    private static final SynchronizedMutableMap<Integer, String> SOCKET_TYPES = SynchronizedMutableMap.of(UnifiedMap.newMap());
    private final Logger log;
    private final MutableList<TcpClient> clients;
    private final MutableList<TcpServer> servers;
    private final Environment env;
    private final Dispatcher dispatcher;
    private final ZContext zmqCtx;
    private volatile Codec<Buffer, T, T> codec;
    private volatile boolean shutdown;

    public ZeroMQ(Environment environment) {
        this(environment, environment.getDefaultDispatcher());
    }

    public ZeroMQ(Environment environment, String str) {
        this(environment, environment.getDispatcher(str));
    }

    public ZeroMQ(Environment environment, Dispatcher dispatcher) {
        this.log = LoggerFactory.getLogger(getClass());
        this.clients = SynchronizedMutableList.of(FastList.newList());
        this.servers = SynchronizedMutableList.of(FastList.newList());
        this.codec = StandardCodecs.PASS_THROUGH_CODEC;
        this.shutdown = false;
        this.env = environment;
        this.dispatcher = dispatcher;
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(final int i) {
        return (String) SOCKET_TYPES.getIfAbsentPut(Integer.valueOf(i), new CheckedFunction0<String>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.1
            /* renamed from: safeValue, reason: merged with bridge method [inline-methods] */
            public String m17safeValue() throws Exception {
                for (Field field : ZMQ.class.getDeclaredFields()) {
                    if (Integer.TYPE.isAssignableFrom(field.getType())) {
                        field.setAccessible(true);
                        try {
                            if (i == field.getInt(null)) {
                                return field.getName();
                            }
                            continue;
                        } catch (IllegalAccessException e) {
                        }
                    }
                }
                return "";
            }
        });
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<ChannelStream<T, T>> dealer(String str) {
        return createClient(str, 5);
    }

    public Promise<ChannelStream<T, T>> push(String str) {
        return createClient(str, 8);
    }

    public Promise<ChannelStream<T, T>> pull(String str) {
        return createServer(str, 7);
    }

    public Promise<ChannelStream<T, T>> request(String str) {
        return createClient(str, 3);
    }

    public Promise<ChannelStream<T, T>> reply(String str) {
        return createServer(str, 4);
    }

    public Promise<ChannelStream<T, T>> router(String str) {
        return createServer(str, 6);
    }

    public Promise<ChannelStream<T, T>> createClient(final String str, final int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        TcpClient tcpClient = NetStreams.tcpClient((Class<? extends TcpClient>) ZeroMQTcpClient.class, new Function<Spec.TcpClient<T, T>, Spec.TcpClient<T, T>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.2
            public Spec.TcpClient<T, T> apply(Spec.TcpClient<T, T> tcpClient2) {
                return ((Spec.TcpClient) ((Spec.TcpClient) tcpClient2.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQClientSocketOptions().context(ZeroMQ.this.zmqCtx).connectAddresses(str).socketType(i));
            }
        });
        this.clients.add(tcpClient);
        Promise<ChannelStream<T, T>> next = tcpClient.next();
        tcpClient.open();
        return next;
    }

    public Promise<ChannelStream<T, T>> createServer(final String str, final int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        TcpServer tcpServer = NetStreams.tcpServer((Class<? extends TcpServer>) ZeroMQTcpServer.class, new Function<Spec.TcpServer<T, T>, Spec.TcpServer<T, T>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.3
            public Spec.TcpServer<T, T> apply(Spec.TcpServer<T, T> tcpServer2) {
                return (Spec.TcpServer) ((Spec.TcpServer) ((Spec.TcpServer) tcpServer2.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQServerSocketOptions().context(ZeroMQ.this.zmqCtx).listenAddresses(str).socketType(i));
            }
        });
        Promise<ChannelStream<T, T>> next = tcpServer.next();
        this.servers.add(tcpServer);
        tcpServer.start();
        return next;
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.servers.removeIf(new CheckedPredicate<Server>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.4
            public boolean safeAccept(Server server) throws Exception {
                Promise<Boolean> shutdown = server.shutdown();
                shutdown.await(60L, TimeUnit.SECONDS);
                Assert.isTrue(shutdown.isSuccess(), "Server " + server + " not properly shut down");
                return true;
            }
        });
        this.clients.removeIf(new CheckedPredicate<Client>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQ.5
            public boolean safeAccept(Client client) throws Exception {
                Promise<Boolean> close = client.close();
                close.await(60L, TimeUnit.SECONDS);
                Assert.isTrue(close.isSuccess(), "Client " + client + " not properly shut down");
                return true;
            }
        });
    }
}
