package reactor.io.net;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.rx.IOStreams;
import reactor.rx.Stream;
import reactor.rx.Streams;

/* loaded from: input_file:reactor/io/net/ChannelStream.class */
public abstract class ChannelStream<IN, OUT> extends Stream<IN> implements ReactorChannel<IN, OUT> {
    protected static final Logger log = LoggerFactory.getLogger(ChannelStream.class);
    private final Environment env;
    private final Dispatcher eventsDispatcher;
    private final Function<Buffer, IN> decoder;
    private final Function<OUT, Buffer> encoder;
    private final long prefetch;

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelStream(Environment environment, Codec<Buffer, IN, OUT> codec, long j, Dispatcher dispatcher) {
        Assert.notNull(dispatcher, "Events Reactor cannot be null");
        this.env = environment;
        this.prefetch = j;
        this.eventsDispatcher = dispatcher;
        if (null != codec) {
            this.decoder = codec.decoder(new Consumer<IN>() { // from class: reactor.io.net.ChannelStream.1
                public void accept(IN in) {
                    ChannelStream.this.doDecoded(in);
                }
            });
            this.encoder = codec.encoder();
        } else {
            this.decoder = null;
            this.encoder = null;
        }
    }

    @Override // reactor.io.net.ReactorChannel
    /* renamed from: writeWith, reason: merged with bridge method [inline-methods] */
    public final Stream<Void> mo1writeWith(final Publisher<? extends OUT> publisher) {
        final Stream<OUT> stream = Stream.class.isAssignableFrom(publisher.getClass()) ? (Stream) publisher : new Stream<OUT>() { // from class: reactor.io.net.ChannelStream.2
            public void subscribe(Subscriber<? super OUT> subscriber) {
                publisher.subscribe(subscriber);
            }

            public long getCapacity() {
                return ChannelStream.this.prefetch;
            }
        };
        return new Stream<Void>() { // from class: reactor.io.net.ChannelStream.3
            public void subscribe(Subscriber<? super Void> subscriber) {
                ChannelStream.this.doSubscribeWriter(stream, subscriber);
            }
        };
    }

    public final Stream<Void> writeBufferWith(Publisher<? extends Buffer> publisher) {
        return mo1writeWith((Publisher) Streams.create(publisher).map(new Function<Buffer, OUT>() { // from class: reactor.io.net.ChannelStream.4
            /* JADX WARN: Multi-variable type inference failed */
            public OUT apply(Buffer buffer) {
                return null != ChannelStream.this.encoder ? (OUT) ((Buffer) ChannelStream.this.encoder.apply(buffer)) : buffer;
            }
        }));
    }

    public final Environment getEnvironment() {
        return this.env;
    }

    public final Dispatcher getDispatcher() {
        return this.eventsDispatcher;
    }

    public final long getCapacity() {
        return this.prefetch;
    }

    public final Function<Buffer, IN> getDecoder() {
        return this.decoder;
    }

    public final Function<OUT, Buffer> getEncoder() {
        return this.encoder;
    }

    public final <DECODED> Stream<DECODED> decode(Codec<IN, DECODED, ?> codec) {
        return IOStreams.decode(codec, this);
    }

    public abstract Object delegate();

    protected abstract void doSubscribeWriter(Publisher<? extends OUT> publisher, Subscriber<? super Void> subscriber);

    protected abstract void doDecoded(IN in);
}
