package reactor.io.net.impl.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.io.buffer.Buffer;
import reactor.io.net.Spec;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:reactor/io/net/impl/netty/NettyNetChannelInboundHandler.class */
public class NettyNetChannelInboundHandler<IN> extends ChannelInboundHandlerAdapter {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Subscriber<? super IN> subscriber;
    protected final NettyChannelStream<IN, ?> channelStream;
    private volatile ByteBuf remainder;
    private volatile PushSubscription<IN> channelSubscription;

    public NettyNetChannelInboundHandler(Subscriber<? super IN> subscriber, NettyChannelStream<IN, ?> nettyChannelStream) {
        this.subscriber = subscriber;
        this.channelStream = nettyChannelStream;
    }

    public PushSubscription<IN> subscription() {
        return this.channelSubscription;
    }

    public NettyChannelStream<IN, ?> channelStream() {
        return this.channelStream;
    }

    public void channelActive(final ChannelHandlerContext channelHandlerContext) throws Exception {
        try {
            if (this.channelSubscription != null) {
                super.channelActive(channelHandlerContext);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("RESUME: " + channelHandlerContext.channel());
                    return;
                }
                return;
            }
            this.channelSubscription = new PushSubscription<IN>(null, this.subscriber) { // from class: reactor.io.net.impl.netty.NettyNetChannelInboundHandler.1
                protected void onRequest(long j) {
                    if (j == Long.MAX_VALUE) {
                        channelHandlerContext.channel().config().setAutoRead(true);
                    }
                    channelHandlerContext.read();
                }

                public void cancel() {
                    super.cancel();
                    if (channelHandlerContext.channel().isOpen()) {
                        channelHandlerContext.close();
                    }
                }
            };
            this.channelStream.registerOnPeer();
            this.subscriber.onSubscribe(this.channelSubscription);
            super.channelActive(channelHandlerContext);
        } catch (Throwable th) {
            this.subscriber.onError(th);
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.channelSubscription.isComplete()) {
            return;
        }
        try {
            super.channelReadComplete(channelHandlerContext);
            if (this.channelSubscription.pendingRequestSignals() != Long.MAX_VALUE && this.channelSubscription.pendingRequestSignals() > 1) {
                channelHandlerContext.read();
            }
        } catch (Throwable th) {
            this.channelSubscription.onError(th);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        try {
            this.channelSubscription.onComplete();
            super.channelInactive(channelHandlerContext);
        } catch (Throwable th) {
            this.channelSubscription.onError(th);
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        try {
        } catch (Throwable th) {
            this.channelSubscription.onError(th);
        }
        if (this.channelSubscription.isComplete() || obj.getClass() == EmptyByteBuf.class) {
            return;
        }
        if (this.channelStream.getDecoder() == Spec.NOOP_DECODER || !ByteBuf.class.isAssignableFrom(obj.getClass())) {
            this.channelSubscription.onNext(obj);
            return;
        }
        if (this.channelStream.getDecoder() == null) {
            this.channelSubscription.onNext(new Buffer(((ByteBuf) obj).nioBuffer()));
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        if (this.remainder == null) {
            try {
                passToConnection(byteBuf);
                if (byteBuf.isReadable()) {
                    this.remainder = byteBuf;
                    return;
                } else {
                    byteBuf.release();
                    return;
                }
            } catch (Throwable th2) {
                if (byteBuf.isReadable()) {
                    this.remainder = byteBuf;
                } else {
                    byteBuf.release();
                }
                throw th2;
            }
        }
        if (bufferHasSufficientCapacity(this.remainder, byteBuf)) {
            this.remainder.writeBytes(byteBuf);
        } else {
            ByteBuf createCombinedBuffer = createCombinedBuffer(this.remainder, byteBuf, channelHandlerContext);
            this.remainder.release();
            this.remainder = createCombinedBuffer;
        }
        byteBuf.release();
        try {
            passToConnection(this.remainder);
            if (this.remainder.isReadable()) {
                this.remainder.discardSomeReadBytes();
            } else {
                this.remainder.release();
                this.remainder = null;
            }
            return;
        } catch (Throwable th3) {
            if (this.remainder.isReadable()) {
                this.remainder.discardSomeReadBytes();
            } else {
                this.remainder.release();
                this.remainder = null;
            }
            throw th3;
        }
        this.channelSubscription.onError(th);
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (("Broken pipe".equals(th.getMessage()) || "Connection reset by peer".equals(th.getMessage())) && this.log.isDebugEnabled()) {
            this.log.debug(channelHandlerContext.channel().toString() + " " + th.getMessage());
        }
        this.channelSubscription.onError(th);
    }

    private boolean bufferHasSufficientCapacity(ByteBuf byteBuf, ByteBuf byteBuf2) {
        return byteBuf.writerIndex() <= byteBuf.maxCapacity() - byteBuf2.readableBytes();
    }

    private ByteBuf createCombinedBuffer(ByteBuf byteBuf, ByteBuf byteBuf2, ChannelHandlerContext channelHandlerContext) {
        ByteBuf buffer = channelHandlerContext.alloc().buffer(byteBuf.readableBytes() + byteBuf2.readableBytes());
        buffer.writeBytes(byteBuf);
        buffer.writeBytes(byteBuf2);
        return buffer;
    }

    private void passToConnection(ByteBuf byteBuf) {
        Object apply;
        Buffer buffer = new Buffer(byteBuf.nioBuffer());
        int position = buffer.position();
        if (null != this.channelStream.getDecoder() && null != buffer.byteBuffer() && (apply = this.channelStream.getDecoder().apply(buffer)) != null) {
            this.channelSubscription.onNext(apply);
        }
        byteBuf.skipBytes(buffer.position() - position);
    }
}
