package reactor.core.action;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import reactor.core.Observable;
import reactor.event.Event;
import reactor.event.lifecycle.Pausable;
import reactor.event.registry.Registration;
import reactor.function.Consumer;
import reactor.timer.Timer;

/* loaded from: input_file:reactor/core/action/WindowAction.class */
public class WindowAction<T> extends Action<T> implements Pausable, Flushable<T> {
    private final ReentrantLock lock;
    private final List<T> collectedWindow;
    private final Registration<? extends Consumer<Long>> timerRegistration;

    public WindowAction(Observable observable, Object obj, Object obj2, Timer timer, int i, TimeUnit timeUnit, int i2) {
        super(observable, obj, obj2);
        this.lock = new ReentrantLock();
        this.collectedWindow = new ArrayList();
        this.timerRegistration = timer.schedule(new Consumer<Long>() { // from class: reactor.core.action.WindowAction.1
            @Override // reactor.function.Consumer
            public void accept(Long l) {
                WindowAction.this.doWindow(l);
            }
        }, i, timeUnit, i2);
    }

    protected void doWindow(Long l) {
        this.lock.lock();
        try {
            if (!this.collectedWindow.isEmpty()) {
                notifyValue(Event.wrap(new ArrayList(this.collectedWindow)));
                this.collectedWindow.clear();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // reactor.core.action.Action
    public void doAccept(Event<T> event) {
        this.lock.lock();
        try {
            this.collectedWindow.add(event.getData());
        } finally {
            this.lock.unlock();
        }
    }

    @Override // reactor.event.lifecycle.Pausable
    public Pausable cancel() {
        this.timerRegistration.cancel();
        return this;
    }

    @Override // reactor.event.lifecycle.Pausable
    public Pausable pause() {
        this.timerRegistration.pause();
        return this;
    }

    @Override // reactor.event.lifecycle.Pausable
    public Pausable resume() {
        this.timerRegistration.resume();
        return this;
    }

    @Override // reactor.core.action.Flushable
    public Flushable<T> flush() {
        doWindow(-1L);
        return this;
    }
}
