package reactor.io.net;

import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.IOStreams;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;

/* loaded from: input_file:reactor/io/net/ChannelStream.class */
public abstract class ChannelStream<IN, OUT> extends Stream<IN> implements Channel<IN, OUT> {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final PeerStream<IN, OUT, ChannelStream<IN, OUT>> peer;
    protected final Broadcaster<IN> contentStream;
    private final Environment env;
    private final Dispatcher ioDispatcher;
    private final Dispatcher eventsDispatcher;
    private final Function<Buffer, IN> decoder;
    private final Function<OUT, Buffer> encoder;
    private final long prefetch;

    /* loaded from: input_file:reactor/io/net/ChannelStream$WriteConsumer.class */
    final class WriteConsumer implements Consumer<OUT> {
        final boolean autoflush;

        public WriteConsumer(boolean z) {
            this.autoflush = z;
        }

        public void accept(OUT out) {
            try {
                if (null != ChannelStream.this.encoder) {
                    Buffer buffer = (Buffer) ChannelStream.this.encoder.apply(out);
                    if (buffer.remaining() > 0) {
                        ChannelStream.this.write(buffer, (Subscriber<?>) null, this.autoflush);
                    }
                } else if (Buffer.class == out.getClass()) {
                    ChannelStream.this.write((Buffer) out, (Subscriber<?>) null, this.autoflush);
                } else {
                    ChannelStream.this.write(out, (Subscriber<?>) null, this.autoflush);
                }
            } catch (Throwable th) {
                ChannelStream.this.peer.notifyError(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelStream(@Nonnull Environment environment, @Nullable Codec<Buffer, IN, OUT> codec, long j, @Nonnull PeerStream<IN, OUT, ChannelStream<IN, OUT>> peerStream, @Nonnull Dispatcher dispatcher, @Nonnull Dispatcher dispatcher2) {
        Assert.notNull(environment, "IO Dispatcher cannot be null");
        Assert.notNull(environment, "Events Reactor cannot be null");
        this.env = environment;
        this.prefetch = j;
        this.ioDispatcher = dispatcher;
        this.peer = peerStream;
        this.eventsDispatcher = dispatcher2;
        this.contentStream = Broadcaster.create(environment, dispatcher2);
        if (null != codec) {
            this.decoder = codec.decoder(new Consumer<IN>() { // from class: reactor.io.net.ChannelStream.1
                public void accept(IN in) {
                    ChannelStream.this.doDecoded(in);
                }
            });
            this.encoder = codec;
        } else {
            this.decoder = null;
            this.encoder = null;
        }
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        this.contentStream.subscribe(subscriber);
    }

    @Override // reactor.io.net.Channel
    public final void sink(Publisher<? extends OUT> publisher) {
        this.peer.subscribeChannelHandlers(Streams.create(publisher), this);
    }

    public final void sinkBuffers(Publisher<? extends Buffer> publisher) {
        this.peer.subscribeChannelHandlers(Streams.create(publisher).map(new Function<Buffer, OUT>() { // from class: reactor.io.net.ChannelStream.2
            /* JADX WARN: Multi-variable type inference failed */
            public OUT apply(Buffer buffer) {
                return null != ChannelStream.this.encoder ? (OUT) ((Buffer) ChannelStream.this.encoder.apply(buffer)) : buffer;
            }
        }), this);
    }

    public final Environment getEnvironment() {
        return this.env;
    }

    public final Dispatcher getDispatcher() {
        return this.eventsDispatcher;
    }

    public final long getCapacity() {
        return this.prefetch;
    }

    public final Dispatcher getIODispatcher() {
        return this.ioDispatcher;
    }

    public final Function<Buffer, IN> getDecoder() {
        return this.decoder;
    }

    public final Function<OUT, Buffer> getEncoder() {
        return this.encoder;
    }

    public final Subscriber<IN> in() {
        return this.contentStream;
    }

    public final <DECODED> Stream<DECODED> decode(Codec<IN, DECODED, ?> codec) {
        return IOStreams.decode(codec, this);
    }

    public abstract Object delegate();

    public void registerOnPeer() {
        this.peer.notifyNewChannel(this);
        this.peer.mergeWrite(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Consumer<OUT> writeThrough(boolean z) {
        return new WriteConsumer(z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void cascadeErrorToPeer(Throwable th) {
        this.log.error("", th);
        this.peer.notifyError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doDecoded(IN in) {
        this.contentStream.onNext(in);
    }

    protected void write(Buffer buffer, Subscriber<?> subscriber, boolean z) {
        write(buffer.byteBuffer(), subscriber, z);
    }

    protected abstract void write(ByteBuffer byteBuffer, Subscriber<?> subscriber, boolean z);

    protected abstract void write(Object obj, Subscriber<?> subscriber, boolean z);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void flush();
}
