package reactor.core.composable;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.Reactor;
import reactor.core.action.Action;
import reactor.core.action.ActionUtils;
import reactor.core.action.CallbackAction;
import reactor.core.action.CallbackEventAction;
import reactor.core.action.ConnectAction;
import reactor.core.action.FilterAction;
import reactor.core.action.Flushable;
import reactor.core.action.FlushableAction;
import reactor.core.action.MapAction;
import reactor.core.action.MapManyAction;
import reactor.core.action.Pipeline;
import reactor.core.action.SupplyAction;
import reactor.core.action.TimeoutAction;
import reactor.event.Event;
import reactor.event.selector.ObjectSelector;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Predicate;
import reactor.function.Supplier;
import reactor.timer.Timer;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

/* loaded from: input_file:reactor/core/composable/Composable.class */
public abstract class Composable<T> implements Pipeline<T> {
    public static final Event<Object> END_EVENT = Event.wrap(null);
    private final Selector acceptSelector;
    private final Object acceptKey;
    private final Selector error;
    private final Selector flush;
    private final Environment environment;
    private final Observable events;
    private final Composable<?> parent;

    protected <U> Composable(@Nullable Observable observable, @Nullable Composable<U> composable) {
        this(observable, composable, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public <U> Composable(@Nullable Observable observable, @Nullable Composable<U> composable, @Nullable Tuple2<Selector, Object> tuple2, @Nullable Environment environment) {
        this.error = Selectors.anonymous();
        this.flush = Selectors.anonymous();
        Assert.state((observable == null && composable == 0) ? false : true, "One of 'observable' or 'parent'  cannot be null.");
        this.parent = composable;
        this.environment = environment;
        this.events = composable == 0 ? observable : composable.events;
        if (null == tuple2) {
            this.acceptSelector = Selectors.anonymous();
            this.acceptKey = this.acceptSelector.getObject();
        } else {
            this.acceptKey = tuple2.getT1();
            this.acceptSelector = new ObjectSelector(tuple2.getT2());
        }
    }

    public <E extends Throwable> Composable<T> when(@Nonnull final Class<E> cls, @Nonnull final Consumer<E> consumer) {
        this.events.on(this.error, new Action<E>(this.events, null) { // from class: reactor.core.composable.Composable.1
            @Override // reactor.core.action.Action
            protected void doAccept(Event<E> event) {
                if (Selectors.T(cls).matches(((Throwable) event.getData()).getClass())) {
                    consumer.accept(event.getData());
                }
            }

            @Override // reactor.core.action.Action
            public String toString() {
                return "When[" + cls.getSimpleName() + "]";
            }
        });
        return this;
    }

    public Composable<T> connect(@Nonnull Composable<T> composable) {
        connectValues(composable);
        consumeErrorAndFlush(composable);
        return this;
    }

    public Composable<T> connectValues(@Nonnull Composable<T> composable) {
        if (composable == this) {
            throw new IllegalArgumentException("Trying to consume itself, leading to erroneous recursive calls");
        }
        add((Action) new ConnectAction(composable.events, composable.acceptKey, composable.error.getObject()));
        return this;
    }

    public Composable<T> consume(@Nonnull Consumer<T> consumer) {
        add((Action) new CallbackAction(consumer, this.events, this.error.getObject()));
        return this;
    }

    public Composable<T> consumeEvent(@Nonnull Consumer<Event<T>> consumer) {
        add((Action) new CallbackEventAction(consumer, this.events, this.error.getObject()));
        return this;
    }

    public Composable<T> consume(@Nonnull Object obj, @Nonnull Observable observable) {
        add((Action) new ConnectAction(observable, obj, null));
        return this;
    }

    public <V> Composable<V> map(@Nonnull Function<T, V> function) {
        Assert.notNull(function, "Map function cannot be null.");
        Composable<V> newComposable = newComposable();
        add((Action) new MapAction(function, newComposable.getObservable(), newComposable.getAcceptKey(), newComposable.getError().getObject())).consumeErrorAndFlush(newComposable);
        return newComposable;
    }

    public <V, C extends Composable<V>> Composable<V> mapMany(@Nonnull Function<T, C> function) {
        Assert.notNull(function, "FlatMap function cannot be null.");
        Composable<V> newComposable = newComposable();
        add((Action) new MapManyAction(function, newComposable.getObservable(), newComposable.getAcceptKey(), newComposable.getError().getObject(), newComposable.getFlush().getObject())).connectErrors(newComposable);
        return newComposable;
    }

    public Composable<T> merge(Composable<T>... composableArr) {
        for (Composable<T> composable : composableArr) {
            composable.connect(this);
        }
        return this;
    }

    public Composable<T> filter(@Nonnull final Function<T, Boolean> function) {
        return filter(new Predicate<T>() { // from class: reactor.core.composable.Composable.2
            @Override // reactor.function.Predicate
            public boolean test(T t) {
                return ((Boolean) function.apply(t)).booleanValue();
            }
        }, null);
    }

    public Composable<Boolean> filter() {
        return filter(FilterAction.simplePredicate, null);
    }

    public Composable<Boolean> filter(@Nonnull Composable<Boolean> composable) {
        return filter(FilterAction.simplePredicate, composable);
    }

    public Composable<T> filter(@Nonnull Predicate<T> predicate) {
        return filter(predicate, null);
    }

    public Composable<T> filter(@Nonnull Predicate<T> predicate, Composable<T> composable) {
        Composable<T> composable2 = (Composable<T>) newComposable();
        add((Action) new FilterAction(predicate, composable2.getObservable(), composable2.getAcceptKey(), composable2.getError().getObject(), composable != null ? composable.events : null, composable != null ? composable.acceptKey : null)).consumeErrorAndFlush(composable2);
        if (composable != null) {
            consumeErrorAndFlush(composable);
        }
        return composable2;
    }

    public Composable<T> timeout(long j) {
        Assert.state(this.environment != null, "Cannot use default timer as no environment has been provided to this Stream");
        return timeout(j, this.environment.getRootTimer());
    }

    public Composable<T> timeout(long j, Timer timer) {
        Assert.state(timer != null, "Timer must be supplied");
        Composable composable = this.parent != null ? this.parent : this;
        add((Action) new TimeoutAction(composable.events, composable.flush.getObject(), this.error.getObject(), timer, j));
        return this;
    }

    public Composable<T> propagate(Supplier<T> supplier) {
        Composable<T> composable = (Composable<T>) newComposable();
        consumeFlush((Flushable<?>) new SupplyAction(supplier, composable.getObservable(), composable.getAcceptKey(), composable.getError().getObject())).connectErrors(composable);
        return composable;
    }

    @Override // reactor.core.action.Flushable
    public Composable<T> flush() {
        Composable composable = this;
        while (true) {
            Composable composable2 = composable;
            if (composable2.parent == null) {
                composable2.notifyFlush();
                return this;
            }
            composable = composable2.parent;
        }
    }

    public String debug() {
        Composable composable = this;
        while (true) {
            Composable composable2 = composable;
            if (composable2.parent == null) {
                return ActionUtils.browseReactor((Reactor) composable2.events, composable2.acceptKey, composable2.error.getObject(), composable2.flush.getObject());
            }
            composable = composable2.parent;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.action.Pipeline
    public Composable<T> add(Action<T> action) {
        this.events.on(this.acceptSelector, action);
        if (0 != action && Flushable.class.isAssignableFrom(action.getClass())) {
            consumeFlush((Flushable<?>) action);
        }
        return this;
    }

    @Override // reactor.core.action.Pipeline
    public Composable<T> consumeFlush(Flushable<?> flushable) {
        this.events.on(this.flush, new FlushableAction(flushable, this.events, this.error.getObject()));
        return this;
    }

    public Composable<T> connectErrors(Composable<?> composable) {
        this.events.on(this.error, new ConnectAction(composable.events, composable.error.getObject(), null));
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Composable<T> consumeErrorAndFlush(Composable<?> composable) {
        this.events.on(this.flush, new ConnectAction(composable.events, composable.flush.getObject(), null));
        return connectErrors(composable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyFlush() {
        this.events.notify(this.flush.getObject(), new Event(Void.class));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyValue(Event<T> event) {
        this.events.notify(this.acceptKey, event);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyError(Throwable th) {
        this.events.notify(this.error.getObject(), Event.wrap(th));
    }

    protected abstract <V> Composable<V> newComposable();

    /* JADX INFO: Access modifiers changed from: protected */
    public Observable getObservable() {
        return this.events;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object getAcceptKey() {
        return this.acceptKey;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Selector getAcceptSelector() {
        return this.acceptSelector;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Selector getFlush() {
        return this.flush;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Selector getError() {
        return this.error;
    }

    protected Composable<?> getParent() {
        return this.parent;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Environment getEnvironment() {
        return this.environment;
    }

    @Override // reactor.core.action.Pipeline
    public /* bridge */ /* synthetic */ Pipeline consumeFlush(Flushable flushable) {
        return consumeFlush((Flushable<?>) flushable);
    }
}
