package reactor.core;

import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.dispatch.SynchronousDispatcher;
import reactor.event.registry.CachingRegistry;
import reactor.event.registry.Registration;
import reactor.event.registry.Registry;
import reactor.event.routing.ArgumentConvertingConsumerInvoker;
import reactor.event.routing.ConsumerFilteringEventRouter;
import reactor.event.routing.EventRouter;
import reactor.event.selector.ClassSelector;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.filter.PassThroughFilter;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Supplier;
import reactor.function.support.SingleUseConsumer;
import reactor.util.Assert;
import reactor.util.UUIDUtils;

/* loaded from: input_file:reactor/core/Reactor.class */
public class Reactor implements Observable {
    private static final EventRouter DEFAULT_EVENT_ROUTER = new ConsumerFilteringEventRouter(new PassThroughFilter(), new ArgumentConvertingConsumerInvoker(null));
    private final Dispatcher dispatcher;
    private final Registry<Consumer<? extends Event<?>>> consumerRegistry;
    private final EventRouter eventRouter;
    private final Consumer<Throwable> dispatchErrorHandler;
    private volatile UUID id;

    /* loaded from: input_file:reactor/core/Reactor$ReplyToConsumer.class */
    public class ReplyToConsumer<E extends Event<?>, V> implements Consumer<E> {
        private final Function<E, V> fn;

        private ReplyToConsumer(Function<E, V> function) {
            this.fn = function;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v13, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v16, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v19, types: [reactor.event.Event] */
        /* JADX WARN: Type inference failed for: r0v22, types: [reactor.core.Observable] */
        /* JADX WARN: Type inference failed for: r0v4, types: [reactor.core.Observable] */
        @Override // reactor.function.Consumer
        public void accept(E e) {
            E wrap;
            ?? replyToObservable;
            Reactor reactor2 = Reactor.this;
            if (ReplyToEvent.class.isAssignableFrom(e.getClass()) && 0 != (replyToObservable = ((ReplyToEvent) e).getReplyToObservable())) {
                reactor2 = replyToObservable;
            }
            try {
                V apply = this.fn.apply(e);
                if (null == apply) {
                    wrap = new Event(Void.class);
                } else {
                    wrap = Event.class.isAssignableFrom(apply.getClass()) ? (Event) apply : Event.wrap(apply);
                }
                reactor2.notify(e.getReplyTo(), (Object) wrap);
            } catch (Throwable th) {
                reactor2.notify((Object) th.getClass(), (Class<?>) Event.wrap(th));
            }
        }

        public Function<E, V> getDelegate() {
            return this.fn;
        }
    }

    /* loaded from: input_file:reactor/core/Reactor$ReplyToEvent.class */
    public static class ReplyToEvent<T> extends Event<T> {
        private static final long serialVersionUID = 1937884784799135647L;
        private final Observable replyToObservable;

        private ReplyToEvent(Event.Headers headers, T t, Object obj, Observable observable, Consumer<Throwable> consumer) {
            super(headers, t, consumer);
            setReplyTo(obj);
            this.replyToObservable = observable;
        }

        private ReplyToEvent(Event<T> event, Observable observable) {
            this(event.getHeaders(), event.getData(), event.getReplyTo(), observable, event.getErrorConsumer());
        }

        @Override // reactor.event.Event
        public <X> Event<X> copy(X x) {
            return new ReplyToEvent(getHeaders(), x, getReplyTo(), this.replyToObservable, getErrorConsumer());
        }

        public Observable getReplyToObservable() {
            return this.replyToObservable;
        }
    }

    public Reactor(@Nullable Dispatcher dispatcher) {
        this(dispatcher, null);
    }

    public Reactor(@Nullable Dispatcher dispatcher, @Nullable EventRouter eventRouter) {
        this(dispatcher, eventRouter, null, null);
    }

    public Reactor(@Nullable Dispatcher dispatcher, @Nullable EventRouter eventRouter, @Nullable Consumer<Throwable> consumer, @Nullable Consumer<Throwable> consumer2) {
        this(new CachingRegistry(), dispatcher, eventRouter, consumer, consumer2);
    }

    public Reactor(@Nonnull Registry<Consumer<? extends Event<?>>> registry, @Nullable Dispatcher dispatcher, @Nullable EventRouter eventRouter, @Nullable Consumer<Throwable> consumer, @Nullable final Consumer<Throwable> consumer2) {
        Assert.notNull(registry, "Consumer Registry cannot be null.");
        this.consumerRegistry = registry;
        this.dispatcher = null == dispatcher ? new SynchronousDispatcher() : dispatcher;
        this.eventRouter = null == eventRouter ? DEFAULT_EVENT_ROUTER : eventRouter;
        if (null == consumer) {
            this.dispatchErrorHandler = new Consumer<Throwable>() { // from class: reactor.core.Reactor.1
                @Override // reactor.function.Consumer
                public void accept(Throwable th) {
                    Class<?> cls = th.getClass();
                    Reactor.this.eventRouter.route(cls, Event.wrap(th).setKey(cls), Reactor.this.consumerRegistry.select(cls), null, null);
                }
            };
        } else {
            this.dispatchErrorHandler = consumer;
        }
        on(new ClassSelector(Throwable.class), new Consumer<Event<Throwable>>() { // from class: reactor.core.Reactor.2
            Logger log;

            @Override // reactor.function.Consumer
            public void accept(Event<Throwable> event) {
                if (null != consumer2) {
                    consumer2.accept(event.getData());
                    return;
                }
                if (null == this.log) {
                    this.log = LoggerFactory.getLogger(Reactor.class);
                }
                this.log.error(event.getData().getMessage(), event.getData());
            }
        });
    }

    public synchronized UUID getId() {
        if (null == this.id) {
            this.id = UUIDUtils.create();
        }
        return this.id;
    }

    public Registry<Consumer<? extends Event<?>>> getConsumerRegistry() {
        return this.consumerRegistry;
    }

    public Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    public EventRouter getEventRouter() {
        return this.eventRouter;
    }

    @Override // reactor.core.Observable
    public boolean respondsToKey(Object obj) {
        Iterator<Registration<? extends Consumer<? extends Event<?>>>> it = this.consumerRegistry.select(obj).iterator();
        while (it.hasNext()) {
            if (!it.next().isCancelled()) {
                return true;
            }
        }
        return false;
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Registration<Consumer<E>> on(Selector selector, Consumer<E> consumer) {
        Assert.notNull(selector, "Selector cannot be null.");
        Assert.notNull(consumer, "Consumer cannot be null.");
        return (Registration<Consumer<E>>) this.consumerRegistry.register(selector, consumer);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>, V> Registration<Consumer<E>> receive(Selector selector, Function<E, V> function) {
        return on(selector, new ReplyToConsumer(function));
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor notify(Object obj, E e, Consumer<E> consumer) {
        Assert.notNull(obj, "Key cannot be null.");
        Assert.notNull(e, "Event cannot be null.");
        e.setKey(obj);
        this.dispatcher.dispatch(obj, e, this.consumerRegistry, this.dispatchErrorHandler, this.eventRouter, consumer);
        return this;
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor notify(Object obj, E e) {
        return notify(obj, (Object) e, (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor notify(Object obj, S s) {
        return notify(obj, s.get(), (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public Reactor notify(Object obj) {
        return notify(obj, (Object) new Event(Void.class), (Consumer<Object>) null);
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor send(Object obj, E e) {
        return notify(obj, (Object) new ReplyToEvent(e, this));
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor send(Object obj, S s) {
        return notify(obj, (Object) new ReplyToEvent((Event) s.get(), this));
    }

    @Override // reactor.core.Observable
    public <E extends Event<?>> Reactor send(Object obj, E e, Observable observable) {
        return notify(obj, (Object) new ReplyToEvent(e, observable));
    }

    @Override // reactor.core.Observable
    public <S extends Supplier<? extends Event<?>>> Reactor send(Object obj, S s, Observable observable) {
        return notify(obj, (Object) new ReplyToEvent((Event) s.get(), observable));
    }

    @Override // reactor.core.Observable
    public <REQ extends Event<?>, RESP extends Event<?>> Reactor sendAndReceive(Object obj, REQ req, Consumer<RESP> consumer) {
        Selector anonymous = Selectors.anonymous();
        on(anonymous, new SingleUseConsumer(consumer)).cancelAfterUse();
        notify(obj, (Object) req.setReplyTo(anonymous.getObject()));
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.core.Observable
    public <REQ extends Event<?>, RESP extends Event<?>, S extends Supplier<REQ>> Reactor sendAndReceive(Object obj, S s, Consumer<RESP> consumer) {
        return sendAndReceive(obj, s.get(), (Consumer) consumer);
    }

    @Override // reactor.core.Observable
    public <T> Consumer<Event<T>> prepare(final Object obj) {
        return new Consumer<Event<T>>() { // from class: reactor.core.Reactor.3
            final List<Registration<? extends Consumer<? extends Event<?>>>> regs;
            final int size;

            {
                this.regs = Reactor.this.consumerRegistry.select(obj);
                this.size = this.regs.size();
            }

            @Override // reactor.function.Consumer
            public void accept(Event<T> event) {
                for (int i = 0; i < this.size; i++) {
                    Reactor.this.dispatcher.dispatch(event.setKey(obj), Reactor.this.eventRouter, this.regs.get(i).getObject(), Reactor.this.dispatchErrorHandler);
                }
            }
        };
    }

    @Override // reactor.core.Observable
    public <T> Consumer<Iterable<Event<T>>> batchNotify(Object obj) {
        return batchNotify(obj, null);
    }

    @Override // reactor.core.Observable
    public <T> Consumer<Iterable<Event<T>>> batchNotify(final Object obj, final Consumer<Void> consumer) {
        return new Consumer<Iterable<Event<T>>>() { // from class: reactor.core.Reactor.4
            final Consumer<Event<Iterable<Event<T>>>> batchConsumer = new Consumer<Event<Iterable<Event<T>>>>() { // from class: reactor.core.Reactor.4.1
                @Override // reactor.function.Consumer
                public void accept(Event<Iterable<Event<T>>> event) {
                    List<Registration<? extends T>> select = Reactor.this.consumerRegistry.select(obj);
                    for (Event<T> event2 : event.getData()) {
                        Iterator<Registration<? extends T>> it = select.iterator();
                        while (it.hasNext()) {
                            Reactor.this.eventRouter.route(null, event2, null, (Consumer) it.next().getObject(), Reactor.this.dispatchErrorHandler);
                        }
                    }
                    if (consumer != null) {
                        consumer.accept(null);
                    }
                }
            };

            @Override // reactor.function.Consumer
            public void accept(Iterable<Event<T>> iterable) {
                Reactor.this.dispatcher.dispatch(null, Event.wrap(iterable), null, Reactor.this.dispatchErrorHandler, Reactor.this.eventRouter, this.batchConsumer);
            }
        };
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && hashCode() == obj.hashCode();
    }

    public <T> void schedule(final Consumer<T> consumer, final T t) {
        this.dispatcher.dispatch(null, null, null, this.dispatchErrorHandler, this.eventRouter, new Consumer<Event<?>>() { // from class: reactor.core.Reactor.5
            @Override // reactor.function.Consumer
            public void accept(Event<?> event) {
                consumer.accept(t);
            }
        });
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable sendAndReceive(Object obj, Supplier supplier, Consumer consumer) {
        return sendAndReceive(obj, (Object) supplier, consumer);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable sendAndReceive(Object obj, Event event, Consumer consumer) {
        return sendAndReceive(obj, (Object) event, consumer);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Supplier supplier, Observable observable) {
        return send(obj, (Object) supplier, observable);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Event event, Observable observable) {
        return send(obj, (Object) event, observable);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Supplier supplier) {
        return send(obj, (Object) supplier);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable send(Object obj, Event event) {
        return send(obj, (Object) event);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Supplier supplier) {
        return notify(obj, (Object) supplier);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Event event) {
        return notify(obj, (Object) event);
    }

    @Override // reactor.core.Observable
    public /* bridge */ /* synthetic */ Observable notify(Object obj, Event event, Consumer consumer) {
        return notify(obj, (Object) event, (Consumer<Object>) consumer);
    }
}
