package reactor.io.net;

import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;

/* loaded from: input_file:reactor/io/net/PeerStream.class */
public abstract class PeerStream<IN, OUT, CONN extends ChannelStream<IN, OUT>> extends Stream<CONN> {
    private final Dispatcher dispatcher;
    protected final Broadcaster<CONN> channels;
    protected final long prefetch;
    private final FastList<OUT> writePublishers;
    private final Environment env;
    private final Codec<Buffer, IN, OUT> defaultCodec;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/io/net/PeerStream$FastList.class */
    public static final class FastList<T> implements Iterable<Publisher<? extends T>> {
        Publisher[] array;
        int size;

        FastList() {
        }

        public void add(Publisher publisher) {
            int i = this.size;
            Publisher[] publisherArr = this.array;
            if (publisherArr == null) {
                publisherArr = new Publisher[16];
                this.array = publisherArr;
            } else if (i == publisherArr.length) {
                Publisher[] publisherArr2 = new Publisher[i + (i >> 2)];
                System.arraycopy(publisherArr, 0, publisherArr2, 0, i);
                publisherArr = publisherArr2;
                this.array = publisherArr;
            }
            publisherArr[i] = publisher;
            this.size = i + 1;
        }

        @Override // java.lang.Iterable
        public Iterator<Publisher<? extends T>> iterator() {
            return new Iterator<Publisher<? extends T>>() { // from class: reactor.io.net.PeerStream.FastList.1
                int i = 0;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    return this.i < FastList.this.size;
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException("");
                }

                @Override // java.util.Iterator
                public Publisher<? extends T> next() {
                    return FastList.this.array[this.i];
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PeerStream(Environment environment, Dispatcher dispatcher, Codec<Buffer, IN, OUT> codec) {
        this(environment, dispatcher, codec, Long.MAX_VALUE);
    }

    protected PeerStream(Environment environment, Dispatcher dispatcher, Codec<Buffer, IN, OUT> codec, long j) {
        this.writePublishers = new FastList<>();
        this.env = (environment == null && Environment.alive()) ? Environment.get() : environment;
        this.defaultCodec = codec;
        this.prefetch = j > 0 ? j : Long.MAX_VALUE;
        this.dispatcher = dispatcher != null ? dispatcher : SynchronousDispatcher.INSTANCE;
        this.channels = Broadcaster.create(environment, SynchronousDispatcher.INSTANCE);
    }

    public void subscribe(Subscriber<? super CONN> subscriber) {
        this.channels.subscribe(subscriber);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doPipeline(final Function<? super CONN, ? extends Publisher<? extends OUT>> function) {
        consume(new Consumer<CONN>() { // from class: reactor.io.net.PeerStream.1
            public void accept(CONN conn) {
                PeerStream.this.addWritePublisher((Publisher) function.apply(conn));
            }
        }, new Consumer<Throwable>() { // from class: reactor.io.net.PeerStream.2
            public void accept(Throwable th) {
                PeerStream.this.notifyError(th);
            }
        });
    }

    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void notifyError(Throwable th) {
        this.channels.onError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void notifyNewChannel(CONN conn) {
        this.channels.onNext(conn);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void notifyShutdown() {
        this.channels.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Publisher<? extends OUT> addWritePublisher(Publisher<? extends OUT> publisher) {
        synchronized (this) {
            this.writePublishers.add(publisher);
        }
        return publisher;
    }

    protected abstract CONN bindChannel(Object obj, long j);

    /* JADX INFO: Access modifiers changed from: protected */
    public Consumer<Throwable> createErrorConsumer(ChannelStream<IN, OUT> channelStream) {
        return new Consumer<Throwable>() { // from class: reactor.io.net.PeerStream.3
            public void accept(Throwable th) {
                try {
                    PeerStream.this.channels.onError(th);
                } catch (Throwable th2) {
                    PeerStream.this.channels.onError(th2);
                }
            }
        };
    }

    protected Action<Long, Long> createBatchAction(final CONN conn, final Consumer<Throwable> consumer, Consumer<Void> consumer2) {
        return new Action<Long, Long>() { // from class: reactor.io.net.PeerStream.4
            boolean first = true;

            /* JADX INFO: Access modifiers changed from: protected */
            public void doNext(Long l) {
                shouldFlush();
                broadcastNext(l);
            }

            protected void doComplete() {
                conn.flush();
                super.doComplete();
            }

            protected void doError(Throwable th) {
                consumer.accept(th);
                super.doError(th);
            }

            private void shouldFlush() {
                if (this.first) {
                    this.first = false;
                } else {
                    conn.flush();
                }
            }
        };
    }

    protected Function<Stream<Long>, ? extends Publisher<? extends Long>> createAdaptiveDemandMapper(final CONN conn, final Consumer<Throwable> consumer) {
        return new Function<Stream<Long>, Publisher<? extends Long>>() { // from class: reactor.io.net.PeerStream.5
            /* JADX WARN: Multi-variable type inference failed */
            public Publisher<? extends Long> apply(Stream<Long> stream) {
                return stream.broadcastTo(PeerStream.this.createBatchAction(conn, consumer, PeerStream.this.completeConsumer(conn)));
            }
        };
    }

    protected Iterable<Publisher<? extends OUT>> routeChannel(CONN conn) {
        return this.writePublishers;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void mergeWrite(CONN conn) {
        int i;
        Iterable<Publisher<? extends OUT>> routeChannel = routeChannel(conn);
        if (routeChannel == this.writePublishers) {
            Publisher publisher = null;
            synchronized (this) {
                i = this.writePublishers.size;
                if (i > 0) {
                    publisher = this.writePublishers.array[0];
                }
            }
            if (i == 0) {
                return;
            }
            if (i == 1 && publisher != null) {
                subscribeChannelHandlers(Streams.create(publisher), conn);
                return;
            }
        }
        subscribeChannelHandlers(Streams.concat(routeChannel), conn);
    }

    protected Consumer<Void> completeConsumer(CONN conn) {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeChannelHandlers(Stream<? extends OUT> stream, CONN conn) {
        if (stream.getCapacity() != Long.MAX_VALUE) {
            stream.adaptiveConsumeOn(conn.getIODispatcher(), conn.writeThrough(false), createAdaptiveDemandMapper(conn, createErrorConsumer(conn)));
        } else {
            stream.consumeOn(conn.getIODispatcher(), conn.writeThrough(true), createErrorConsumer(conn), completeConsumer(conn));
        }
    }

    @Nullable
    public final Codec<Buffer, IN, OUT> getDefaultCodec() {
        return this.defaultCodec;
    }

    @Nonnull
    public final Environment getEnvironment() {
        return this.env;
    }

    public final long getPrefetchSize() {
        return this.prefetch;
    }
}
