package de.dfki.delight.feature;

import de.dfki.delight.common.ParameterConversionManager;
import de.dfki.delight.common.ParameterInfo;
import de.dfki.delight.common.ParameterValue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import javax.ws.rs.sse.SseEventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/dfki/delight/feature/InboundEventStream.class */
public class InboundEventStream<T> implements EventStream<T> {
    private static final Logger LOG = LoggerFactory.getLogger(InboundEventStream.class);
    private SseEventSource eventSource;
    private Consumer<T> listener;
    private ParameterConversionManager parameterConversionManager;
    private ParameterInfo elementTypeInfo;
    private Consumer<Throwable> errorHandler = th -> {
        LOG.error("Error receiving events: ", th);
    };
    private Runnable completionHandler = () -> {
    };
    private BlockingQueue<Object> receivedEventQueue = new LinkedBlockingQueue();

    public SseEventSource getEventSource() {
        return this.eventSource;
    }

    public ParameterConversionManager getParameterConversionManager() {
        return this.parameterConversionManager;
    }

    public ParameterInfo getElementTypeInfo() {
        return this.elementTypeInfo;
    }

    public InboundEventStream(SseEventSource sseEventSource, ParameterInfo parameterInfo, ParameterConversionManager parameterConversionManager) {
        this.eventSource = sseEventSource;
        this.elementTypeInfo = parameterInfo;
        this.parameterConversionManager = parameterConversionManager;
    }

    @Override // de.dfki.delight.feature.EventSink
    public void generatorFn(EventStream<T> eventStream) {
        throw new IllegalStateException("generatorFn");
    }

    protected T transformEventToApiRep(String str) {
        return (T) getParameterConversionManager().convertToApiRepresentation(getElementTypeInfo(), new ParameterValue(getElementTypeInfo(), str));
    }

    @Override // de.dfki.delight.feature.EventSource
    public void register(Consumer<T> consumer, Consumer<Throwable> consumer2) {
        this.listener = consumer;
        this.errorHandler = consumer2;
        this.completionHandler = () -> {
        };
    }

    @Override // de.dfki.delight.feature.EventSource
    public void register(Consumer<T> consumer, Consumer<Throwable> consumer2, Runnable runnable) {
        this.listener = consumer;
        this.errorHandler = consumer2;
        this.completionHandler = runnable;
    }

    @Override // de.dfki.delight.feature.EventSource
    public void startListening() {
        LOG.debug("started to listen to events ({})", getEventSource());
        getEventSource().register(inboundSseEvent -> {
            String readData = inboundSseEvent.readData();
            System.out.println("Read: " + readData);
            this.listener.accept(transformEventToApiRep(readData));
        }, this.errorHandler, this.completionHandler);
        getEventSource().open();
    }

    @Override // de.dfki.delight.feature.EventSource
    public T pullEvent() throws InterruptedException {
        if (!getEventSource().isOpen()) {
            this.receivedEventQueue.clear();
            getEventSource().register(inboundSseEvent -> {
                try {
                    this.receivedEventQueue.put(transformEventToApiRep(inboundSseEvent.readData()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, this.errorHandler, this.completionHandler);
            getEventSource().open();
        }
        return (T) this.receivedEventQueue.take();
    }

    @Override // de.dfki.delight.feature.EventStream, java.lang.AutoCloseable
    public void close() {
        LOG.debug("stopped listening to events ({})", getEventSource());
        getEventSource().close();
        this.receivedEventQueue.clear();
    }
}
