package reactor.io.net.impl.netty.http;

import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.support.Assert;
import reactor.io.net.ReactorChannel;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Headers;
import reactor.io.net.http.model.HttpHeaders;
import reactor.io.net.http.model.Method;
import reactor.io.net.http.model.Protocol;
import reactor.io.net.http.model.ResponseHeaders;
import reactor.io.net.http.model.Status;
import reactor.io.net.http.model.Transfer;
import reactor.io.net.impl.netty.NettyChannelStream;

/* loaded from: input_file:reactor/io/net/impl/netty/http/NettyHttpChannel.class */
public class NettyHttpChannel<IN, OUT> extends HttpChannel<IN, OUT> {
    private final NettyChannelStream<IN, OUT> tcpStream;
    private final HttpRequest nettyRequest;
    private final NettyHttpHeaders headers;
    private HttpResponse nettyResponse;
    private NettyHttpResponseHeaders responseHeaders;

    public NettyHttpChannel(NettyChannelStream<IN, OUT> nettyChannelStream, HttpRequest httpRequest) {
        super(nettyChannelStream.getEnvironment(), nettyChannelStream.getCapacity(), nettyChannelStream.getDispatcher());
        this.tcpStream = nettyChannelStream;
        this.nettyRequest = httpRequest;
        this.nettyResponse = new DefaultHttpResponse(httpRequest.getProtocolVersion(), HttpResponseStatus.OK);
        this.headers = new NettyHttpHeaders(httpRequest);
        this.responseHeaders = new NettyHttpResponseHeaders(this.nettyResponse);
        responseHeader(ResponseHeaders.TRANSFER_ENCODING, "chunked");
    }

    @Override // reactor.io.net.ChannelStream
    protected void doSubscribeWriter(Publisher<? extends OUT> publisher, Subscriber<? super Void> subscriber) {
        this.tcpStream.doSubscribeWriter(publisher, subscriber);
    }

    @Override // reactor.io.net.ChannelStream
    protected void doDecoded(IN in) {
        this.tcpStream.doDecoded(in);
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        this.tcpStream.subscribe(subscriber);
    }

    @Override // reactor.io.net.http.HttpChannel
    public Protocol protocol() {
        HttpVersion protocolVersion = this.nettyRequest.getProtocolVersion();
        if (protocolVersion.equals(HttpVersion.HTTP_1_0)) {
            return Protocol.HTTP_1_0;
        }
        if (protocolVersion.equals(HttpVersion.HTTP_1_1)) {
            return Protocol.HTTP_1_1;
        }
        throw new IllegalStateException(protocolVersion.protocolName() + " not supported");
    }

    @Override // reactor.io.net.http.HttpChannel
    protected void doHeader(String str, String str2) {
        this.headers.set(str, str2);
    }

    @Override // reactor.io.net.http.HttpChannel
    protected void doAddHeader(String str, String str2) {
        this.headers.add(str, str2);
    }

    @Override // reactor.io.net.http.HttpChannel
    public String uri() {
        return this.nettyRequest.getUri();
    }

    @Override // reactor.io.net.http.HttpChannel
    public Method method() {
        return new Method(this.nettyRequest.getMethod().name());
    }

    @Override // reactor.io.net.http.HttpChannel
    public HttpHeaders headers() {
        return this.headers;
    }

    public HttpRequest getNettyRequest() {
        return this.nettyRequest;
    }

    @Override // reactor.io.net.http.HttpChannel
    public Status responseStatus() {
        return Status.valueOf(this.nettyResponse.getStatus().code());
    }

    @Override // reactor.io.net.http.HttpChannel
    public void doResponseStatus(Status status) {
        this.nettyResponse.setStatus(HttpResponseStatus.valueOf(status.getCode()));
    }

    @Override // reactor.io.net.http.HttpChannel
    public Transfer transfer() {
        if ("chunked".equals(this.headers.get(ResponseHeaders.TRANSFER_ENCODING))) {
            Assert.isTrue(Protocol.HTTP_1_1.equals(protocol()));
            return Transfer.CHUNKED;
        }
        if (this.headers.get(ResponseHeaders.TRANSFER_ENCODING) == null) {
            return Transfer.NON_CHUNKED;
        }
        throw new IllegalStateException("Can't determine a valide transfer based on headers and protocol");
    }

    @Override // reactor.io.net.http.HttpChannel
    public HttpChannel<IN, OUT> transfer(Transfer transfer) {
        switch (transfer) {
            case EVENT_STREAM:
                responseHeader(Headers.CONTENT_TYPE, "text/event-stream");
            case CHUNKED:
                Assert.isTrue(Protocol.HTTP_1_1.equals(protocol()));
                responseHeader(ResponseHeaders.TRANSFER_ENCODING, "chunked");
                break;
            case NON_CHUNKED:
                responseHeaders().remove(ResponseHeaders.TRANSFER_ENCODING);
                break;
        }
        return this;
    }

    @Override // reactor.io.net.http.HttpChannel
    public ResponseHeaders responseHeaders() {
        return this.responseHeaders;
    }

    @Override // reactor.io.net.http.HttpChannel
    protected void doResponseHeader(String str, String str2) {
        this.responseHeaders.set(str, str2);
    }

    @Override // reactor.io.net.http.HttpChannel
    protected void doAddResponseHeader(String str, String str2) {
        this.responseHeaders.add(str, str2);
    }

    public HttpResponse getNettyResponse() {
        return this.nettyResponse;
    }

    @Override // reactor.io.net.ChannelStream
    public SocketChannel delegate() {
        return this.tcpStream.delegate();
    }

    @Override // reactor.io.net.ReactorChannel
    public InetSocketAddress remoteAddress() {
        return this.tcpStream.remoteAddress();
    }

    @Override // reactor.io.net.ReactorChannel
    public ReactorChannel.ConsumerSpec on() {
        return this.tcpStream.on();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setNettyResponse(HttpResponse httpResponse) {
        this.nettyResponse = httpResponse;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkHeader() {
        return HEADERS_SENT.compareAndSet(this, 0, 1);
    }

    @Override // reactor.io.net.http.HttpChannel
    public boolean isKeepAlive() {
        return this.headers.isKeepAlive();
    }

    @Override // reactor.io.net.http.HttpChannel
    public HttpChannel<IN, OUT> keepAlive(boolean z) {
        this.responseHeaders.keepAlive(z);
        return this;
    }
}
