package reactor.io.net.impl.netty.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpClient;
import reactor.io.net.http.model.Headers;
import reactor.io.net.http.model.Method;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyEventLoopDispatcher;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;

/* loaded from: input_file:reactor/io/net/impl/netty/http/NettyHttpClient.class */
public class NettyHttpClient<IN, OUT> extends HttpClient<IN, OUT> {
    private final Logger log;
    private final NettyTcpClient<IN, OUT> client;
    private String lastURL;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/io/net/impl/netty/http/NettyHttpClient$NettyHttpClientChannel.class */
    public class NettyHttpClientChannel extends NettyHttpChannel<IN, OUT> {
        final Buffer body;
        private final NettyChannelStream<IN, OUT> tcpStream;
        private final HttpRequest request;
        private final Promise<Object> promise;

        public NettyHttpClientChannel(NettyChannelStream<IN, OUT> nettyChannelStream, HttpRequest httpRequest) {
            super(nettyChannelStream, NettyHttpClient.this.client, httpRequest, NettyHttpClient.this.getDefaultCodec());
            this.tcpStream = nettyChannelStream;
            this.request = httpRequest;
            this.body = new Buffer();
            this.promise = Promises.ready(getEnvironment(), getDispatcher());
        }

        @Override // reactor.io.net.impl.netty.http.NettyHttpChannel, reactor.io.net.ChannelStream
        protected void write(ByteBuffer byteBuffer, Subscriber<?> subscriber, boolean z) {
            this.body.append(new ByteBuffer[]{byteBuffer});
            if (z) {
                write((Object) 1, (Subscriber<?>) null, true);
            }
        }

        @Override // reactor.io.net.impl.netty.http.NettyHttpChannel, reactor.io.net.ChannelStream
        protected void write(Object obj, Subscriber<?> subscriber, boolean z) {
            if (HEADERS_SENT.compareAndSet(this, 0, 1)) {
                DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(this.request.getProtocolVersion(), this.request.getMethod(), this.request.getUri(), Unpooled.wrappedBuffer(this.body.flip().byteBuffer()));
                HttpHeaders.setContentLength(defaultFullHttpRequest, this.body.limit());
                HttpHeaders.setHeader(defaultFullHttpRequest, Headers.CONTENT_TYPE, HttpHeaders.getHeader(this.request, Headers.CONTENT_TYPE));
                this.tcpStream.write((Object) defaultFullHttpRequest, (Subscriber<?>) this.promise, true);
            }
        }
    }

    public NettyHttpClient(Environment environment, Dispatcher dispatcher, final InetSocketAddress inetSocketAddress, ClientSocketOptions clientSocketOptions, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, codec);
        this.log = LoggerFactory.getLogger(NettyHttpClient.class);
        this.lastURL = "http://localhost:8080";
        this.client = new NettyTcpClient<IN, OUT>(environment, dispatcher, inetSocketAddress, clientSocketOptions, sslOptions, codec) { // from class: reactor.io.net.impl.netty.http.NettyHttpClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.io.net.impl.netty.tcp.NettyTcpClient, reactor.io.net.PeerStream
            public NettyChannelStream<IN, OUT> bindChannel(Object obj, long j) {
                NettyHttpClient.this.bindChannel(obj, j);
                return null;
            }

            @Override // reactor.io.net.tcp.TcpClient
            public InetSocketAddress getConnectAddress() {
                if (inetSocketAddress != null) {
                    return inetSocketAddress;
                }
                try {
                    URL url = new URL(NettyHttpClient.this.lastURL);
                    return new InetSocketAddress(url.getHost(), url.getPort());
                } catch (Exception e) {
                    throw new IllegalArgumentException(e);
                }
            }
        };
    }

    @Override // reactor.io.net.http.HttpClient
    public Promise<? extends HttpChannel<IN, OUT>> request(final Method method, final String str, final Function<HttpChannel<IN, OUT>, ? extends Publisher<? extends OUT>> function) {
        this.lastURL = str;
        Assert.isTrue((method == null || str == null) ? false : true);
        final Promise<? extends HttpChannel<IN, OUT>> prepare = Promises.prepare();
        take(1L).consume(new Consumer<HttpChannel<IN, OUT>>() { // from class: reactor.io.net.impl.netty.http.NettyHttpClient.2
            public void accept(HttpChannel<IN, OUT> httpChannel) {
                final NettyHttpClientChannel nettyHttpClientChannel = (NettyHttpClientChannel) httpChannel;
                nettyHttpClientChannel.getNettyRequest().setUri(URI.create(str).getPath()).setMethod(new HttpMethod(method.getName()));
                nettyHttpClientChannel.promise.onComplete(new Consumer<Promise<Object>>() { // from class: reactor.io.net.impl.netty.http.NettyHttpClient.2.1
                    public void accept(Promise<Object> promise) {
                        if (promise.isError()) {
                            prepare.onError(promise.reason());
                        } else {
                            prepare.onNext(nettyHttpClientChannel);
                        }
                    }
                });
                if (function != null) {
                    NettyHttpClient.this.addWritePublisher((Publisher) function.apply(httpChannel));
                }
            }
        }, new Consumer<Throwable>() { // from class: reactor.io.net.impl.netty.http.NettyHttpClient.3
            public void accept(Throwable th) {
                prepare.onError(th);
            }
        });
        return prepare;
    }

    @Override // reactor.io.net.Client
    public Promise<Boolean> open() {
        return this.client.open();
    }

    @Override // reactor.io.net.Client
    public Stream<Boolean> open(Reconnect reconnect) {
        return this.client.open(reconnect);
    }

    @Override // reactor.io.net.Client
    public Promise<Boolean> close() {
        return this.client.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NettyHttpChannel<IN, OUT> createClientRequest(NettyChannelStream<IN, OUT> nettyChannelStream, HttpRequest httpRequest) {
        NettyHttpClientChannel nettyHttpClientChannel = new NettyHttpClientChannel(nettyChannelStream, httpRequest);
        notifyNewChannel(nettyHttpClientChannel);
        mergeWrite(nettyHttpClientChannel);
        return nettyHttpClientChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.net.PeerStream
    public HttpChannel<IN, OUT> bindChannel(Object obj, long j) {
        SocketChannel socketChannel = (SocketChannel) obj;
        NettyChannelStream nettyChannelStream = new NettyChannelStream(getEnvironment(), getDefaultCodec(), j == -1 ? getPrefetchSize() : j, this.client, new NettyEventLoopDispatcher(socketChannel.eventLoop(), 128), getDispatcher(), socketChannel);
        ChannelPipeline pipeline = socketChannel.pipeline();
        if (this.log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyHttpClient.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new NettyHttpClientHandler(nettyChannelStream, this)});
        return null;
    }
}
