package de.dfki.delight.feature;

import de.dfki.delight.common.ParameterConversionManager;
import de.dfki.delight.common.ParameterInfo;
import de.dfki.delight.common.ParameterValue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.ws.rs.sse.SseEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/dfki/delight/feature/InboundEventStream.class */
public class InboundEventStream<T> implements EventStream<T> {
    private static final Logger LOG = LoggerFactory.getLogger(InboundEventStream.class);
    private SseEventSource eventSource;
    private Consumer<T> listener;
    private ParameterConversionManager parameterConversionManager;
    private ParameterInfo elementTypeInfo;
    private Consumer<Throwable> errorHandler = th -> {
        LOG.error("Error receiving events: ", th);
    };
    private Runnable completionHandler = makeCompletionHandler(() -> {
        LOG.info(this + " has been completed");
    });
    private BlockingQueue<Object> receivedEventQueue = new LinkedBlockingQueue();
    private AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
    private AtomicBoolean isCompleted = new AtomicBoolean(false);

    public SseEventSource getEventSource() {
        return this.eventSource;
    }

    public ParameterConversionManager getParameterConversionManager() {
        return this.parameterConversionManager;
    }

    public ParameterInfo getElementTypeInfo() {
        return this.elementTypeInfo;
    }

    public AtomicBoolean hasBeenStarted() {
        return this.hasBeenStarted;
    }

    public InboundEventStream(SseEventSource sseEventSource, ParameterInfo parameterInfo, ParameterConversionManager parameterConversionManager) {
        this.eventSource = sseEventSource;
        this.elementTypeInfo = parameterInfo;
        this.parameterConversionManager = parameterConversionManager;
    }

    protected Runnable makeCompletionHandler(final Runnable runnable) {
        return new Runnable() { // from class: de.dfki.delight.feature.InboundEventStream.1
            @Override // java.lang.Runnable
            public void run() {
                InboundEventStream.this.isCompleted.set(true);
                runnable.run();
            }
        };
    }

    @Override // de.dfki.delight.feature.EventSink
    public void generatorFn(EventStream<T> eventStream) {
        throw new IllegalStateException("generatorFn");
    }

    protected T transformEventToApiRep(String str) {
        return (T) getParameterConversionManager().convertToApiRepresentation(getElementTypeInfo(), new ParameterValue(getElementTypeInfo(), str));
    }

    @Override // de.dfki.delight.feature.EventSource
    public void register(Consumer<T> consumer, Consumer<Throwable> consumer2) {
        this.listener = consumer;
        this.errorHandler = consumer2;
        this.completionHandler = makeCompletionHandler(() -> {
        });
    }

    @Override // de.dfki.delight.feature.EventSource
    public void register(Consumer<T> consumer, Consumer<Throwable> consumer2, Runnable runnable) {
        this.listener = consumer;
        this.errorHandler = consumer2;
        this.completionHandler = makeCompletionHandler(runnable);
    }

    @Override // de.dfki.delight.feature.EventSource
    public synchronized void startListening() {
        LOG.debug("started to listen to events ({})", getEventSource());
        if (hasBeenStarted().getAndSet(true)) {
            LOG.debug("event stream already started", getEventSource());
            return;
        }
        this.isCompleted.set(false);
        getEventSource().register(inboundSseEvent -> {
            String readData = inboundSseEvent.readData();
            System.out.println("Read: " + readData);
            this.listener.accept(transformEventToApiRep(readData));
        }, this.errorHandler, this.completionHandler);
        getEventSource().open();
    }

    @Override // de.dfki.delight.feature.EventSource
    public synchronized T pullEvent(long j, TimeUnit timeUnit) throws InterruptedException {
        LOG.debug("pullEvent(timeout={}, unit={})", Long.valueOf(j), timeUnit);
        if (!this.hasBeenStarted.getAndSet(true)) {
            this.receivedEventQueue.clear();
            getEventSource().register(inboundSseEvent -> {
                try {
                    this.receivedEventQueue.put(transformEventToApiRep(inboundSseEvent.readData()));
                } catch (InterruptedException e) {
                    LOG.debug("putting an element to the event queue was interrupted ....");
                }
            }, this.errorHandler, this.completionHandler);
            getEventSource().open();
        }
        return (T) this.receivedEventQueue.poll(j, timeUnit);
    }

    @Override // de.dfki.delight.feature.EventStream
    public boolean isClosed() {
        return this.isCompleted.get();
    }

    @Override // de.dfki.delight.feature.EventStream, java.lang.AutoCloseable
    public void close() {
        LOG.debug("closing event stream ({}), isOpen: {}", getEventSource(), Boolean.valueOf(getEventSource().isOpen()));
        this.receivedEventQueue.clear();
        getEventSource().close();
    }
}
