package reactor.io.net.impl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Channel;
import reactor.io.net.ChannelStream;
import reactor.io.net.PeerStream;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/io/net/impl/netty/NettyChannelStream.class */
public class NettyChannelStream<IN, OUT> extends ChannelStream<IN, OUT> {
    private final Channel ioChannel;

    /* loaded from: input_file:reactor/io/net/impl/netty/NettyChannelStream$NettyConsumerSpec.class */
    private class NettyConsumerSpec implements Channel.ConsumerSpec {
        private NettyConsumerSpec() {
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec close(final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler() { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.1
                public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
                    consumer.accept((Object) null);
                    super.channelInactive(channelHandlerContext);
                }
            }});
            return this;
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec readIdle(long j, final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(j, 0L, 0L, TimeUnit.MILLISECONDS) { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.2
                protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) throws Exception {
                    if (idleStateEvent.state() == IdleState.READER_IDLE) {
                        consumer.accept((Object) null);
                    }
                    super.channelIdle(channelHandlerContext, idleStateEvent);
                }
            }});
            return this;
        }

        @Override // reactor.io.net.Channel.ConsumerSpec
        public Channel.ConsumerSpec writeIdle(long j, final Consumer<Void> consumer) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, j, 0L, TimeUnit.MILLISECONDS) { // from class: reactor.io.net.impl.netty.NettyChannelStream.NettyConsumerSpec.3
                protected void channelIdle(ChannelHandlerContext channelHandlerContext, IdleStateEvent idleStateEvent) throws Exception {
                    if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                        consumer.accept((Object) null);
                    }
                    super.channelIdle(channelHandlerContext, idleStateEvent);
                }
            }});
            return this;
        }
    }

    public NettyChannelStream(@Nonnull Environment environment, @Nullable Codec<Buffer, IN, OUT> codec, long j, @Nonnull PeerStream<IN, OUT, ChannelStream<IN, OUT>> peerStream, @Nonnull Dispatcher dispatcher, @Nonnull Dispatcher dispatcher2, @Nonnull io.netty.channel.Channel channel) {
        super(environment, codec, j, peerStream, dispatcher, dispatcher2);
        this.ioChannel = channel;
    }

    @Override // reactor.io.net.Channel
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress) this.ioChannel.remoteAddress();
    }

    @Override // reactor.io.net.Channel
    public Channel.ConsumerSpec on() {
        return new NettyConsumerSpec();
    }

    @Override // reactor.io.net.ChannelStream
    public io.netty.channel.Channel delegate() {
        return this.ioChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.io.net.ChannelStream
    public void doDecoded(IN in) {
        NettyNetChannelInboundHandler nettyNetChannelInboundHandler = this.ioChannel.pipeline().get(NettyNetChannelInboundHandler.class);
        PushSubscription<IN> subscription = nettyNetChannelInboundHandler == null ? null : nettyNetChannelInboundHandler.subscription();
        if (subscription != null) {
            subscription.onNext(in);
        } else {
            super.doDecoded(in);
        }
    }

    @Override // reactor.io.net.ChannelStream
    public void write(ByteBuffer byteBuffer, Subscriber<?> subscriber, boolean z) {
        ByteBuf buffer = this.ioChannel.alloc().buffer(byteBuffer.remaining());
        buffer.writeBytes(byteBuffer);
        write(buffer, subscriber, z);
    }

    @Override // reactor.io.net.ChannelStream
    public void write(Object obj, final Subscriber<?> subscriber, boolean z) {
        (z ? this.ioChannel.writeAndFlush(obj) : this.ioChannel.write(obj)).addListener(new ChannelFutureListener() { // from class: reactor.io.net.impl.netty.NettyChannelStream.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    Throwable cause = channelFuture.cause();
                    if (null != subscriber) {
                        subscriber.onError(cause);
                    }
                    NettyChannelStream.this.cascadeErrorToPeer(cause);
                }
                if (null != subscriber) {
                    subscriber.onComplete();
                }
            }
        });
    }

    @Override // reactor.io.net.ChannelStream
    public void flush() {
        if (this.ioChannel.isActive()) {
            this.ioChannel.flush();
        }
    }

    public String toString() {
        return "NettyNetChannel{channel=" + this.ioChannel + '}';
    }
}
