package reactor.core.composable;

import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.action.Action;
import reactor.core.action.BatchAction;
import reactor.core.action.BufferAction;
import reactor.core.action.CollectAction;
import reactor.core.action.CountAction;
import reactor.core.action.DistinctAction;
import reactor.core.action.Flushable;
import reactor.core.action.ForEachAction;
import reactor.core.action.MovingWindowAction;
import reactor.core.action.Pipeline;
import reactor.core.action.ReduceAction;
import reactor.core.action.ScanAction;
import reactor.core.action.WhenAction;
import reactor.core.action.WindowAction;
import reactor.event.selector.Selector;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Functions;
import reactor.function.Predicate;
import reactor.function.Supplier;
import reactor.function.support.Tap;
import reactor.timer.Timer;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

/* loaded from: input_file:reactor/core/composable/Stream.class */
public class Stream<T> extends Composable<T> {
    protected final int batchSize;

    public Stream(@Nullable Observable observable, int i, @Nullable Composable<?> composable, @Nullable Environment environment) {
        this(observable, i, composable, null, environment);
    }

    public Stream(@Nullable Observable observable, int i, @Nullable Composable<?> composable, Tuple2<Selector, Object> tuple2, @Nullable Environment environment) {
        super(observable, composable, tuple2, environment);
        this.batchSize = i;
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> consume(@Nonnull Consumer<T> consumer) {
        return (Stream) super.consume((Consumer) consumer);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> connect(@Nonnull Composable<T> composable) {
        return (Stream) super.connect((Composable) composable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> consume(@Nonnull Object obj, @Nonnull Observable observable) {
        return (Stream) super.consume(obj, observable);
    }

    @Override // reactor.core.composable.Composable, reactor.core.action.Flushable
    public Stream<T> flush() {
        return (Stream) super.flush();
    }

    @Override // reactor.core.composable.Composable
    public <E extends Throwable> Stream<T> when(@Nonnull Class<E> cls, @Nonnull Consumer<E> consumer) {
        return (Stream) super.when((Class) cls, (Consumer) consumer);
    }

    @Override // reactor.core.composable.Composable
    public <V> Stream<V> map(@Nonnull Function<T, V> function) {
        return (Stream) super.map((Function) function);
    }

    @Override // reactor.core.composable.Composable
    public <V, C extends Composable<V>> Stream<V> mapMany(@Nonnull Function<T, C> function) {
        return (Stream) super.mapMany((Function) function);
    }

    @Override // reactor.core.composable.Composable
    public Stream<Boolean> filter() {
        return (Stream) super.filter();
    }

    @Override // reactor.core.composable.Composable
    public Stream<Boolean> filter(@Nonnull Composable<Boolean> composable) {
        return (Stream) super.filter(composable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> filter(@Nonnull Function<T, Boolean> function) {
        return (Stream) super.filter((Function) function);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> filter(@Nonnull Predicate<T> predicate) {
        return (Stream) super.filter((Predicate) predicate);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> filter(@Nonnull Predicate<T> predicate, Composable<T> composable) {
        return (Stream) super.filter((Predicate) predicate, (Composable) composable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> connectValues(@Nonnull Composable<T> composable) {
        return (Stream) super.connectValues((Composable) composable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> timeout(long j, Timer timer) {
        return (Stream) super.timeout(j, timer);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> timeout(long j) {
        return (Stream) super.timeout(j);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> merge(Composable<T>... composableArr) {
        return (Stream) super.merge((Composable[]) composableArr);
    }

    @Override // reactor.core.composable.Composable, reactor.core.action.Pipeline
    public Stream<T> consumeFlush(@Nonnull Flushable<?> flushable) {
        return (Stream) super.consumeFlush(flushable);
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> connectErrors(Composable<?> composable) {
        return (Stream) super.connectErrors(composable);
    }

    public Stream<T> propagate(Iterable<T> iterable) {
        Stream<T> stream = (Stream<T>) newComposable(this.batchSize);
        consumeFlush((Flushable<?>) new ForEachAction(iterable, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject(), stream.getFlush().getObject())).connectErrors((Composable<?>) stream);
        return stream;
    }

    @Override // reactor.core.composable.Composable
    public Stream<T> propagate(Supplier<T> supplier) {
        return (Stream) super.propagate((Supplier) supplier);
    }

    public Stream<T> flushWhen(Predicate<T> predicate) {
        add((Action) new WhenAction(predicate, getObservable(), getFlush().getObject(), getError().getObject()));
        return this;
    }

    public Stream<T> first() {
        return first(this.batchSize);
    }

    public Stream<T> first(int i) {
        Assert.state(i > 0, "Cannot first() an unbounded Stream. Try extracting a batch first.");
        Stream<T> stream = (Stream<T>) newComposable(i);
        add((Action) new BatchAction(i, stream.getObservable(), null, stream.getError().getObject(), null, stream.getAcceptKey())).connectErrors(stream);
        return stream;
    }

    public Stream<T> last() {
        return last(this.batchSize);
    }

    public Stream<T> last(int i) {
        Assert.state(i > 0, "Cannot last() an unbounded Stream. Try extracting a batch first.");
        Stream<T> stream = (Stream<T>) newComposable(i);
        add((Action) new BatchAction(i, stream.getObservable(), null, stream.getError().getObject(), stream.getAcceptKey(), null)).connectErrors(stream);
        return stream;
    }

    public Stream<T> distinct() {
        Stream<T> stream = (Stream<T>) newComposable(this.batchSize);
        add((Action) new DistinctAction(stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject())).consumeErrorAndFlush(stream);
        return stream;
    }

    public <V> Stream<V> split() {
        return split(this.batchSize);
    }

    public <V> Stream<V> split(int i) {
        Stream<V> newComposable = newComposable(i);
        getObservable().on(getAcceptSelector(), new ForEachAction(newComposable.getObservable(), newComposable.getAcceptKey(), newComposable.getError().getObject(), newComposable.getFlush().getObject()));
        connectErrors((Composable<?>) newComposable);
        return newComposable;
    }

    public Tap<T> tap() {
        Tap<T> tap = new Tap<>();
        consume((Consumer) tap);
        return tap;
    }

    public Stream<T> buffer() {
        return buffer(this.batchSize);
    }

    public Stream<T> buffer(int i) {
        Stream<T> stream = (Stream<T>) newComposable(i);
        add((Action) new BufferAction(i, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject(), stream.getFlush().getObject()));
        return stream;
    }

    public Stream<T> bufferWithErrors() {
        return bufferWithErrors(this.batchSize);
    }

    public Stream<T> bufferWithErrors(int i) {
        Stream<T> stream = (Stream<T>) newComposable(i);
        BufferAction bufferAction = new BufferAction(i, stream.getObservable(), stream.getError().getObject(), null, null);
        getObservable().on(getError(), bufferAction);
        consumeFlush((Flushable<?>) bufferAction);
        add((Action) new BufferAction(i, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject(), stream.getFlush().getObject()));
        return stream;
    }

    public Stream<List<T>> collect() {
        return collect(this.batchSize);
    }

    public Stream<List<T>> collect(int i) {
        Stream<List<T>> stream = (Stream<List<T>>) newComposable(1);
        add((Action) new CollectAction(i, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject())).connectErrors(stream);
        return stream;
    }

    public Stream<List<T>> window(int i) {
        return window(i, TimeUnit.MILLISECONDS);
    }

    public Stream<List<T>> movingWindow(int i, int i2) {
        return movingWindow(i, TimeUnit.MILLISECONDS, i2);
    }

    public Stream<List<T>> window(int i, TimeUnit timeUnit) {
        return window(i, timeUnit, 0);
    }

    public Stream<List<T>> movingWindow(int i, TimeUnit timeUnit, int i2) {
        return movingWindow(i, timeUnit, 0, i2);
    }

    public Stream<List<T>> window(int i, TimeUnit timeUnit, int i2) {
        Assert.state(getEnvironment() != null, "Cannot use default timer as no environment has been provided to this Stream");
        return window(i, timeUnit, i2, getEnvironment().getRootTimer());
    }

    public Stream<List<T>> movingWindow(int i, TimeUnit timeUnit, int i2, int i3) {
        Assert.state(getEnvironment() != null, "Cannot use default timer as no environment has been provided to this Stream");
        return movingWindow(i, timeUnit, i2, i3, getEnvironment().getRootTimer());
    }

    public Stream<List<T>> window(int i, TimeUnit timeUnit, int i2, Timer timer) {
        Assert.state(timer != null, "Timer must be supplied");
        Stream<List<T>> stream = (Stream<List<T>>) newComposable(1);
        add((Action) new WindowAction(stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject(), timer, i, timeUnit, i2)).connectErrors(stream);
        return stream;
    }

    public Stream<List<T>> movingWindow(int i, TimeUnit timeUnit, int i2, int i3, Timer timer) {
        Assert.state(timer != null, "Timer must be supplied");
        Stream<List<T>> stream = (Stream<List<T>>) newComposable(1);
        add((Action) new MovingWindowAction(stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject(), timer, i, timeUnit, i2, i3)).connectErrors(stream);
        return stream;
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> function, A a) {
        return reduce(function, Functions.supplier(a), this.batchSize);
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> function, @Nullable Supplier<A> supplier, int i) {
        Stream<A> stream = (Stream<A>) newComposable(1);
        add((Action) new ReduceAction(i, supplier, function, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject())).connectErrors(stream);
        return stream;
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> function) {
        return reduce(function, null, this.batchSize);
    }

    public <A> Stream<A> scan(@Nonnull Function<Tuple2<T, A>, A> function, A a) {
        return scan((Function) function, (Supplier) Functions.supplier(a));
    }

    public <A> Stream<A> scan(@Nonnull Function<Tuple2<T, A>, A> function, @Nullable Supplier<A> supplier) {
        Stream<A> stream = (Stream<A>) newComposable(1);
        add((Action) new ScanAction(supplier, function, stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject())).consumeErrorAndFlush(stream);
        return stream;
    }

    public <A> Stream<A> scan(@Nonnull Function<Tuple2<T, A>, A> function) {
        return scan((Function) function, (Supplier) null);
    }

    public Stream<T> count(Stream<Long> stream) {
        add((Action) new CountAction(stream.getObservable(), stream.getAcceptKey(), stream.getError().getObject()));
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.core.composable.Composable
    public <V> Stream<V> newComposable() {
        return newComposable(this.batchSize);
    }

    protected <V> Stream<V> newComposable(int i) {
        return new Stream<>(null, i, this, getEnvironment());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferAction<T> bufferConsumer(int i) {
        BufferAction<T> bufferAction = new BufferAction<>(i, getObservable(), getAcceptKey(), getError(), getFlush());
        consumeFlush((Flushable<?>) bufferAction);
        return bufferAction;
    }

    @Override // reactor.core.composable.Composable
    public /* bridge */ /* synthetic */ Composable connectErrors(Composable composable) {
        return connectErrors((Composable<?>) composable);
    }

    @Override // reactor.core.composable.Composable, reactor.core.action.Pipeline
    public /* bridge */ /* synthetic */ Composable consumeFlush(@Nonnull Flushable flushable) {
        return consumeFlush((Flushable<?>) flushable);
    }

    @Override // reactor.core.composable.Composable
    public /* bridge */ /* synthetic */ Composable filter(@Nonnull Composable composable) {
        return filter((Composable<Boolean>) composable);
    }

    @Override // reactor.core.composable.Composable, reactor.core.action.Pipeline
    public /* bridge */ /* synthetic */ Pipeline consumeFlush(@Nonnull Flushable flushable) {
        return consumeFlush((Flushable<?>) flushable);
    }
}
