package io.moquette.spi.impl;

import io.moquette.server.ConnectionDescriptorStore;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/spi/impl/MessagesPublisher.class */
public class MessagesPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(MessagesPublisher.class);
    private final ConnectionDescriptorStore connectionDescriptors;
    private final ISessionsStore m_sessionsStore;
    private final PersistentQueueMessageSender messageSender;
    private final SubscriptionsDirectory subscriptions;

    public MessagesPublisher(ConnectionDescriptorStore connectionDescriptorStore, ISessionsStore iSessionsStore, PersistentQueueMessageSender persistentQueueMessageSender, SubscriptionsDirectory subscriptionsDirectory) {
        this.connectionDescriptors = connectionDescriptorStore;
        this.m_sessionsStore = iSessionsStore;
        this.messageSender = persistentQueueMessageSender;
        this.subscriptions = subscriptionsDirectory;
    }

    static MqttPublishMessage notRetainedPublish(String str, MqttQoS mqttQoS, ByteBuf byteBuf) {
        return notRetainedPublishWithMessageId(str, mqttQoS, byteBuf, 0);
    }

    private static MqttPublishMessage notRetainedPublishWithMessageId(String str, MqttQoS mqttQoS, ByteBuf byteBuf, int i) {
        return new MqttPublishMessage(new MqttFixedHeader(MqttMessageType.PUBLISH, false, mqttQoS, false, 0), new MqttPublishVariableHeader(str, i), byteBuf);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish2Subscribers(IMessagesStore.StoredMessage storedMessage, Topic topic, int i) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}, payload={}, subscriptionTree={}", new Object[]{storedMessage.getClientID(), topic, Integer.valueOf(i), DebugUtils.payload2Str(storedMessage.getPayload()), this.subscriptions.dumpTree()});
        } else {
            LOG.info("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}", new Object[]{storedMessage.getClientID(), topic, Integer.valueOf(i)});
        }
        publish2Subscribers(storedMessage, topic);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publish2Subscribers(IMessagesStore.StoredMessage storedMessage, Topic topic) {
        List<Subscription> matches = this.subscriptions.matches(topic);
        String topic2 = storedMessage.getTopic();
        MqttQoS qos = storedMessage.getQos();
        ByteBuf payload = storedMessage.getPayload();
        for (Subscription subscription : matches) {
            MqttQoS lowerQosToTheSubscriptionDesired = ProtocolProcessor.lowerQosToTheSubscriptionDesired(subscription, qos);
            ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(subscription.getClientId());
            if (this.connectionDescriptors.isConnected(subscription.getClientId())) {
                LOG.debug("Sending PUBLISH message to active subscriber. CId={}, topicFilter={}, qos={}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired});
                ByteBuf retainedDuplicate = payload.retainedDuplicate();
                this.messageSender.sendPublish(sessionForClient, lowerQosToTheSubscriptionDesired != MqttQoS.AT_MOST_ONCE ? notRetainedPublishWithMessageId(topic2, lowerQosToTheSubscriptionDesired, retainedDuplicate, sessionForClient.inFlightAckWaiting(storedMessage)) : notRetainedPublish(topic2, lowerQosToTheSubscriptionDesired, retainedDuplicate));
            } else if (!sessionForClient.isCleanSession()) {
                LOG.debug("Storing pending PUBLISH inactive message. CId={}, topicFilter={}, qos={}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), lowerQosToTheSubscriptionDesired});
                sessionForClient.enqueue(storedMessage);
            }
        }
    }
}
