package io.moquette.spi.impl;

import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.ConnectionDescriptorStore;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.ISubscriptionsStore;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsDirectory;
import io.moquette.spi.impl.subscriptions.Topic;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor.class */
public class ProtocolProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    protected ConnectionDescriptorStore connectionDescriptors;
    protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;
    private SubscriptionsDirectory subscriptions;
    private ISubscriptionsStore subscriptionStore;
    private boolean allowAnonymous;
    private boolean allowZeroByteClientId;
    private IAuthorizator m_authorizator;
    private IMessagesStore m_messagesStore;
    private ISessionsStore m_sessionsStore;
    private IAuthenticator m_authenticator;
    private BrokerInterceptor m_interceptor;
    private Qos0PublishHandler qos0PublishHandler;
    private Qos1PublishHandler qos1PublishHandler;
    private Qos2PublishHandler qos2PublishHandler;
    private MessagesPublisher messagesPublisher;
    private InternalRepublisher internalRepublisher;
    private ConcurrentMap<String, WillMessage> m_willStore = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.moquette.spi.impl.ProtocolProcessor$1, reason: invalid class name */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$RunningSubscription.class */
    public class RunningSubscription {
        final String clientID;
        final long packetId;

        RunningSubscription(String str, long j) {
            this.clientID = str;
            this.packetId = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RunningSubscription runningSubscription = (RunningSubscription) obj;
            return this.packetId == runningSubscription.packetId && (this.clientID == null ? runningSubscription.clientID == null : this.clientID.equals(runningSubscription.clientID));
        }

        public int hashCode() {
            return (31 * (this.clientID != null ? this.clientID.hashCode() : 0)) + ((int) (this.packetId ^ (this.packetId >>> 32)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$SubscriptionState.class */
    public enum SubscriptionState {
        STORED,
        VERIFIED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/spi/impl/ProtocolProcessor$WillMessage.class */
    public static final class WillMessage {
        private final String topic;
        private final ByteBuffer payload;
        private final boolean retained;
        private final MqttQoS qos;

        WillMessage(String str, ByteBuffer byteBuffer, boolean z, MqttQoS mqttQoS) {
            this.topic = str;
            this.payload = byteBuffer;
            this.retained = z;
            this.qos = mqttQoS;
        }

        public String getTopic() {
            return this.topic;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public boolean isRetained() {
            return this.retained;
        }

        public MqttQoS getQos() {
            return this.qos;
        }
    }

    public void init(SubscriptionsDirectory subscriptionsDirectory, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor) {
        init(subscriptionsDirectory, iMessagesStore, iSessionsStore, iAuthenticator, z, false, iAuthorizator, brokerInterceptor, null);
    }

    public void init(SubscriptionsDirectory subscriptionsDirectory, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, boolean z2, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor) {
        init(subscriptionsDirectory, iMessagesStore, iSessionsStore, iAuthenticator, z, z2, iAuthorizator, brokerInterceptor, null);
    }

    public void init(SubscriptionsDirectory subscriptionsDirectory, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, boolean z2, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor, String str) {
        init(new ConnectionDescriptorStore(iSessionsStore), subscriptionsDirectory, iMessagesStore, iSessionsStore, iAuthenticator, z, z2, iAuthorizator, brokerInterceptor, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(ConnectionDescriptorStore connectionDescriptorStore, SubscriptionsDirectory subscriptionsDirectory, IMessagesStore iMessagesStore, ISessionsStore iSessionsStore, IAuthenticator iAuthenticator, boolean z, boolean z2, IAuthorizator iAuthorizator, BrokerInterceptor brokerInterceptor, String str) {
        LOG.info("Initializing MQTT protocol processor...");
        this.connectionDescriptors = connectionDescriptorStore;
        this.subscriptionInCourse = new ConcurrentHashMap();
        this.m_interceptor = brokerInterceptor;
        this.subscriptions = subscriptionsDirectory;
        this.allowAnonymous = z;
        this.allowZeroByteClientId = z2;
        this.m_authorizator = iAuthorizator;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initial subscriptions tree={}", subscriptionsDirectory.dumpTree());
        }
        this.m_authenticator = iAuthenticator;
        this.m_messagesStore = iMessagesStore;
        this.m_sessionsStore = iSessionsStore;
        this.subscriptionStore = iSessionsStore.subscriptionStore();
        LOG.info("Initializing messages publisher...");
        PersistentQueueMessageSender persistentQueueMessageSender = new PersistentQueueMessageSender(this.connectionDescriptors);
        this.messagesPublisher = new MessagesPublisher(connectionDescriptorStore, iSessionsStore, persistentQueueMessageSender, subscriptionsDirectory);
        LOG.info("Initializing QoS publish handlers...");
        this.qos0PublishHandler = new Qos0PublishHandler(this.m_authorizator, this.m_messagesStore, this.m_interceptor, this.messagesPublisher);
        this.qos1PublishHandler = new Qos1PublishHandler(this.m_authorizator, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.messagesPublisher);
        this.qos2PublishHandler = new Qos2PublishHandler(this.m_authorizator, subscriptionsDirectory, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.m_sessionsStore, this.messagesPublisher);
        LOG.info("Initializing internal republisher...");
        this.internalRepublisher = new InternalRepublisher(persistentQueueMessageSender);
    }

    public void processConnect(Channel channel, MqttConnectMessage mqttConnectMessage) {
        MqttConnectPayload payload = mqttConnectMessage.payload();
        String clientIdentifier = payload.clientIdentifier();
        LOG.info("Processing CONNECT message. CId={}, username={}", clientIdentifier, payload.userName());
        if (mqttConnectMessage.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel() && mqttConnectMessage.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage connAck = connAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
            LOG.error("MQTT protocol version is not valid. CId={}", clientIdentifier);
            channel.writeAndFlush(connAck);
            channel.close();
            return;
        }
        if (clientIdentifier == null || clientIdentifier.length() == 0) {
            if (!mqttConnectMessage.variableHeader().isCleanSession() || !this.allowZeroByteClientId) {
                channel.writeAndFlush(connAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED));
                channel.close();
                LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }
            clientIdentifier = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientIdentifier, payload.userName());
        }
        if (!login(channel, mqttConnectMessage, clientIdentifier)) {
            channel.close();
            return;
        }
        ConnectionDescriptor connectionDescriptor = new ConnectionDescriptor(clientIdentifier, channel, mqttConnectMessage.variableHeader().isCleanSession());
        ConnectionDescriptor addConnection = this.connectionDescriptors.addConnection(connectionDescriptor);
        if (addConnection != null) {
            LOG.info("The client ID is being used in an existing connection. It will be closed. CId={}", clientIdentifier);
            addConnection.abort();
            return;
        }
        initializeKeepAliveTimeout(channel, mqttConnectMessage, clientIdentifier);
        storeWillMessage(mqttConnectMessage, clientIdentifier);
        if (!sendAck(connectionDescriptor, mqttConnectMessage, clientIdentifier)) {
            channel.close();
            return;
        }
        this.m_interceptor.notifyClientConnected(mqttConnectMessage);
        ClientSession createOrLoadClientSession = createOrLoadClientSession(connectionDescriptor, mqttConnectMessage, clientIdentifier);
        if (createOrLoadClientSession == null) {
            channel.close();
        } else {
            if (!republish(connectionDescriptor, mqttConnectMessage, createOrLoadClientSession)) {
                channel.close();
                return;
            }
            if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED, ConnectionDescriptor.ConnectionState.ESTABLISHED)) {
                channel.close();
            }
            LOG.info("The CONNECT message has been processed. CId={}, username={}", clientIdentifier, payload.userName());
        }
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode mqttConnectReturnCode) {
        return connAck(mqttConnectReturnCode, false);
    }

    private MqttConnAckMessage connAckWithSessionPresent(MqttConnectReturnCode mqttConnectReturnCode) {
        return connAck(mqttConnectReturnCode, true);
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode mqttConnectReturnCode, boolean z) {
        return new MqttConnAckMessage(new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(mqttConnectReturnCode, z));
    }

    private boolean login(Channel channel, MqttConnectMessage mqttConnectMessage, String str) {
        if (!mqttConnectMessage.variableHeader().hasUserName()) {
            if (this.allowAnonymous) {
                return true;
            }
            LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", str);
            failedCredentials(channel);
            return false;
        }
        byte[] bArr = null;
        if (mqttConnectMessage.variableHeader().hasPassword()) {
            bArr = mqttConnectMessage.payload().password().getBytes();
        } else if (!this.allowAnonymous) {
            LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", str);
            failedCredentials(channel);
            return false;
        }
        if (this.m_authenticator.checkValid(str, mqttConnectMessage.payload().userName(), bArr)) {
            NettyUtils.userName(channel, mqttConnectMessage.payload().userName());
            return true;
        }
        LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}", new Object[]{str, mqttConnectMessage.payload().userName(), bArr});
        failedCredentials(channel);
        return false;
    }

    private boolean sendAck(ConnectionDescriptor connectionDescriptor, MqttConnectMessage mqttConnectMessage, String str) {
        LOG.info("Sending connect ACK. CId={}", str);
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.DISCONNECTED, ConnectionDescriptor.ConnectionState.SENDACK)) {
            return false;
        }
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        boolean z = sessionForClient != null;
        MqttConnAckMessage connAck = (mqttConnectMessage.variableHeader().isCleanSession() || !z) ? connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED) : connAckWithSessionPresent(MqttConnectReturnCode.CONNECTION_ACCEPTED);
        if (z) {
            LOG.info("Cleaning session. CId={}", str);
            sessionForClient.cleanSession(mqttConnectMessage.variableHeader().isCleanSession());
        }
        connectionDescriptor.writeAndFlush(connAck);
        LOG.info("The connect ACK has been sent. CId={}", str);
        return true;
    }

    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage mqttConnectMessage, String str) {
        int keepAliveTimeSeconds = mqttConnectMessage.variableHeader().keepAliveTimeSeconds();
        LOG.info("Configuring connection. CId={}", str);
        NettyUtils.keepAlive(channel, keepAliveTimeSeconds);
        NettyUtils.cleanSession(channel, mqttConnectMessage.variableHeader().isCleanSession());
        NettyUtils.clientID(channel, str);
        int round = Math.round(keepAliveTimeSeconds * 1.5f);
        setIdleTime(channel.pipeline(), round);
        LOG.debug("The connection has been configured CId={}, keepAlive={}, cleanSession={}, idleTime={}", new Object[]{str, Integer.valueOf(keepAliveTimeSeconds), Boolean.valueOf(mqttConnectMessage.variableHeader().isCleanSession()), Integer.valueOf(round)});
    }

    private void storeWillMessage(MqttConnectMessage mqttConnectMessage, String str) {
        if (mqttConnectMessage.variableHeader().isWillFlag()) {
            MqttQoS valueOf = MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos());
            LOG.info("Configuring MQTT last will and testament CId={}, willQos={}, willTopic={}, willRetain={}", new Object[]{str, valueOf, mqttConnectMessage.payload().willTopic(), Boolean.valueOf(mqttConnectMessage.variableHeader().isWillRetain())});
            byte[] bytes = mqttConnectMessage.payload().willMessage().getBytes();
            this.m_willStore.put(str, new WillMessage(mqttConnectMessage.payload().willTopic(), (ByteBuffer) ByteBuffer.allocate(bytes.length).put(bytes).flip(), mqttConnectMessage.variableHeader().isWillRetain(), valueOf));
            LOG.info("MQTT last will and testament has been configured. CId={}", str);
        }
    }

    private ClientSession createOrLoadClientSession(ConnectionDescriptor connectionDescriptor, MqttConnectMessage mqttConnectMessage, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SENDACK, ConnectionDescriptor.ConnectionState.SESSION_CREATED)) {
            return null;
        }
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        if (!(sessionForClient != null)) {
            sessionForClient = this.m_sessionsStore.createNewSession(str, mqttConnectMessage.variableHeader().isCleanSession());
        }
        if (mqttConnectMessage.variableHeader().isCleanSession()) {
            LOG.info("Cleaning session. CId={}", str);
            sessionForClient.cleanSession();
        }
        return sessionForClient;
    }

    private boolean republish(ConnectionDescriptor connectionDescriptor, MqttConnectMessage mqttConnectMessage, ClientSession clientSession) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SESSION_CREATED, ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED)) {
            return false;
        }
        if (!mqttConnectMessage.variableHeader().isCleanSession()) {
            republishStoredInSession(clientSession);
        }
        connectionDescriptor.setupAutoFlusher(500);
        return true;
    }

    private void failedCredentials(Channel channel) {
        channel.writeAndFlush(connAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
        LOG.info("Client {} failed to connect with bad username or password.", channel);
    }

    private void setIdleTime(ChannelPipeline channelPipeline, int i) {
        if (channelPipeline.names().contains("idleStateHandler")) {
            channelPipeline.remove("idleStateHandler");
        }
        channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(i, 0, 0));
    }

    private void republishStoredInSession(ClientSession clientSession) {
        LOG.info("Republishing stored publish events. CId={}", clientSession.clientID);
        Queue<IMessagesStore.StoredMessage> queue = clientSession.queue();
        if (queue.isEmpty()) {
            LOG.info("There are no stored publish events to CId={}", clientSession.clientID);
        } else {
            this.internalRepublisher.publishStored(clientSession, queue);
        }
    }

    public void processPubAck(Channel channel, MqttPubAckMessage mqttPubAckMessage) {
        String clientID = NettyUtils.clientID(channel);
        int messageId = mqttPubAckMessage.variableHeader().messageId();
        String userName = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", Integer.valueOf(messageId));
        IMessagesStore.StoredMessage inFlightAcknowledged = this.m_sessionsStore.sessionForClient(clientID).inFlightAcknowledged(messageId);
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inFlightAcknowledged, inFlightAcknowledged.getTopic(), userName, messageId));
    }

    public static IMessagesStore.StoredMessage asStoredMessage(MqttPublishMessage mqttPublishMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(Utils.readBytesAndRewind(mqttPublishMessage.payload()), mqttPublishMessage.fixedHeader().qosLevel(), mqttPublishMessage.variableHeader().topicName());
        storedMessage.setRetained(mqttPublishMessage.fixedHeader().isRetain());
        return storedMessage;
    }

    private static IMessagesStore.StoredMessage asStoredMessage(WillMessage willMessage) {
        IMessagesStore.StoredMessage storedMessage = new IMessagesStore.StoredMessage(willMessage.getPayload().array(), willMessage.getQos(), willMessage.getTopic());
        storedMessage.setRetained(willMessage.isRetained());
        return storedMessage;
    }

    public void processPublish(Channel channel, MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", new Object[]{NettyUtils.clientID(channel), mqttPublishMessage.variableHeader().topicName(), Integer.valueOf(mqttPublishMessage.variableHeader().messageId()), qosLevel});
        switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[qosLevel.ordinal()]) {
            case 1:
                this.qos0PublishHandler.receivedPublishQos0(channel, mqttPublishMessage);
                return;
            case 2:
                this.qos1PublishHandler.receivedPublishQos1(channel, mqttPublishMessage);
                return;
            case 3:
                this.qos2PublishHandler.receivedPublishQos2(channel, mqttPublishMessage);
                return;
            default:
                LOG.error("Unknown QoS-Type:{}", qosLevel);
                return;
        }
    }

    public void internalPublish(MqttPublishMessage mqttPublishMessage, String str) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        LOG.info("Sending PUBLISH message. Topic={}, qos={}", topic, qosLevel);
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(mqttPublishMessage);
        if (str == null || str.isEmpty()) {
            asStoredMessage.setClientID("BROKER_SELF");
        } else {
            asStoredMessage.setClientID(str);
        }
        this.messagesPublisher.publish2Subscribers(asStoredMessage, topic);
        if (mqttPublishMessage.fixedHeader().isRetain()) {
            if (qosLevel == MqttQoS.AT_MOST_ONCE || mqttPublishMessage.payload().readableBytes() == 0) {
                this.m_messagesStore.cleanRetained(topic);
            } else {
                this.m_messagesStore.storeRetained(topic, asStoredMessage);
            }
        }
    }

    private void forwardPublishWill(WillMessage willMessage, String str) {
        LOG.info("Publishing will message. CId={}, topic={}", str, willMessage.getTopic());
        IMessagesStore.StoredMessage asStoredMessage = asStoredMessage(willMessage);
        asStoredMessage.setClientID(str);
        this.messagesPublisher.publish2Subscribers(asStoredMessage, new Topic(asStoredMessage.getTopic()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MqttQoS lowerQosToTheSubscriptionDesired(Subscription subscription, MqttQoS mqttQoS) {
        if (mqttQoS.value() > subscription.getRequestedQos().value()) {
            mqttQoS = subscription.getRequestedQos();
        }
        return mqttQoS;
    }

    public void processPubRel(Channel channel, MqttMessage mqttMessage) {
        this.qos2PublishHandler.processPubRel(channel, mqttMessage);
    }

    public void processPubRec(Channel channel, MqttMessage mqttMessage) {
        String clientID = NettyUtils.clientID(channel);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        int messageId = Utils.messageId(mqttMessage);
        sessionForClient.moveInFlightToSecondPhaseAckWaiting(messageId, sessionForClient.inFlightAcknowledged(messageId));
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, Integer.valueOf(messageId));
        channel.writeAndFlush(new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId)));
    }

    public void processPubComp(Channel channel, MqttMessage mqttMessage) {
        String clientID = NettyUtils.clientID(channel);
        int messageId = Utils.messageId(mqttMessage);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, Integer.valueOf(messageId));
        IMessagesStore.StoredMessage secondPhaseAcknowledged = this.m_sessionsStore.sessionForClient(clientID).secondPhaseAcknowledged(messageId);
        String userName = NettyUtils.userName(channel);
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(secondPhaseAcknowledged, secondPhaseAcknowledged.getTopic(), userName, messageId));
    }

    public void processDisconnect(Channel channel) throws InterruptedException {
        String clientID = NettyUtils.clientID(channel);
        LOG.info("Processing DISCONNECT message. CId={}", clientID);
        channel.flush();
        ConnectionDescriptor connection = this.connectionDescriptors.getConnection(clientID);
        if (connection == null) {
            channel.close();
            return;
        }
        if (connection.doesNotUseChannel(channel)) {
            LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
            connection.abort();
            return;
        }
        if (!removeSubscriptions(connection, clientID)) {
            LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
            connection.abort();
            return;
        }
        if (!dropStoredMessages(connection, clientID)) {
            LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
            connection.abort();
            return;
        }
        if (!cleanWillMessageAndNotifyInterceptor(connection, clientID)) {
            LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
            connection.abort();
        } else if (!connection.close()) {
            LOG.info("The connection has been closed. CId={}", clientID);
        } else if (this.connectionDescriptors.removeConnection(connection)) {
            LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
        } else {
            LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        }
    }

    private boolean removeSubscriptions(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.ESTABLISHED, ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED)) {
            return false;
        }
        if (!connectionDescriptor.cleanSession) {
            return true;
        }
        LOG.info("Removing saved subscriptions. CId={}", connectionDescriptor.clientID);
        this.subscriptionStore.wipeSubscriptions(str);
        LOG.info("The saved subscriptions have been removed. CId={}", connectionDescriptor.clientID);
        return true;
    }

    private boolean dropStoredMessages(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED, ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED)) {
            return false;
        }
        if (!connectionDescriptor.cleanSession) {
            return true;
        }
        LOG.debug("Removing messages of session. CId={}", connectionDescriptor.clientID);
        this.m_sessionsStore.dropQueue(str);
        LOG.debug("The messages of the session have been removed. CId={}", connectionDescriptor.clientID);
        return true;
    }

    private boolean cleanWillMessageAndNotifyInterceptor(ConnectionDescriptor connectionDescriptor, String str) {
        if (!connectionDescriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED, ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED)) {
            return false;
        }
        LOG.info("Removing will message. ClientId={}", connectionDescriptor.clientID);
        this.m_willStore.remove(str);
        this.m_interceptor.notifyClientDisconnected(str, connectionDescriptor.getUsername());
        return true;
    }

    public void processConnectionLost(String str, Channel channel) {
        LOG.info("Processing connection lost event. CId={}", str);
        this.connectionDescriptors.removeConnection(new ConnectionDescriptor(str, channel, true));
        if (this.m_willStore.containsKey(str)) {
            forwardPublishWill(this.m_willStore.get(str), str);
            this.m_willStore.remove(str);
        }
        this.m_interceptor.notifyClientConnectionLost(str, NettyUtils.userName(channel));
    }

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage mqttUnsubscribeMessage) {
        List list = mqttUnsubscribeMessage.payload().topics();
        String clientID = NettyUtils.clientID(channel);
        LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, list);
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(clientID);
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Topic topic = new Topic((String) it.next());
            if (!topic.isValid()) {
                channel.close();
                LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", new Object[]{clientID, list, topic});
                return;
            } else {
                LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
                this.subscriptions.removeSubscription(topic, clientID);
                sessionForClient.unsubscribeFrom(topic);
                this.m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, NettyUtils.userName(channel));
            }
        }
        int messageId = mqttUnsubscribeMessage.variableHeader().messageId();
        MqttUnsubAckMessage mqttUnsubAckMessage = new MqttUnsubAckMessage(new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0), MqttMessageIdVariableHeader.from(messageId));
        LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", new Object[]{clientID, list, Integer.valueOf(messageId)});
        channel.writeAndFlush(mqttUnsubAckMessage);
    }

    public void processSubscribe(Channel channel, MqttSubscribeMessage mqttSubscribeMessage) {
        String clientID = NettyUtils.clientID(channel);
        int messageId = Utils.messageId(mqttSubscribeMessage);
        LOG.info("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, Integer.valueOf(messageId));
        RunningSubscription runningSubscription = new RunningSubscription(clientID, messageId);
        if (this.subscriptionInCourse.putIfAbsent(runningSubscription, SubscriptionState.VERIFIED) != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}", clientID, Integer.valueOf(messageId));
            return;
        }
        String userName = NettyUtils.userName(channel);
        List<MqttTopicSubscription> doVerify = doVerify(clientID, userName, mqttSubscribeMessage);
        MqttSubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(doVerify, messageId);
        if (!this.subscriptionInCourse.replace(runningSubscription, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, messageId={}", clientID, Integer.valueOf(messageId));
            return;
        }
        LOG.info("Creating and storing subscriptions CId={}, messageId={}, topics={}", new Object[]{clientID, Integer.valueOf(messageId), doVerify});
        List<Subscription> doStoreSubscription = doStoreSubscription(doVerify, clientID);
        Iterator<Subscription> it = doStoreSubscription.iterator();
        while (it.hasNext()) {
            this.subscriptions.add(it.next().asClientTopicCouple());
        }
        LOG.info("Sending SUBACK response CId={}, messageId={}", clientID, Integer.valueOf(messageId));
        channel.writeAndFlush(doAckMessageFromValidateFilters);
        Iterator<Subscription> it2 = doStoreSubscription.iterator();
        while (it2.hasNext()) {
            publishRetainedMessagesInSession(it2.next(), userName);
        }
        if (this.subscriptionInCourse.remove(runningSubscription, SubscriptionState.STORED)) {
            return;
        }
        LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, Integer.valueOf(messageId));
    }

    private List<Subscription> doStoreSubscription(List<MqttTopicSubscription> list, String str) {
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        ArrayList arrayList = new ArrayList();
        for (MqttTopicSubscription mqttTopicSubscription : list) {
            if (mqttTopicSubscription.qualityOfService() != MqttQoS.FAILURE) {
                Subscription subscription = new Subscription(str, new Topic(mqttTopicSubscription.topicName()), mqttTopicSubscription.qualityOfService());
                sessionForClient.subscribe(subscription);
                arrayList.add(subscription);
            }
        }
        return arrayList;
    }

    private List<MqttTopicSubscription> doVerify(String str, String str2, MqttSubscribeMessage mqttSubscribeMessage) {
        MqttQoS mqttQoS;
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(str);
        ArrayList arrayList = new ArrayList();
        int messageId = Utils.messageId(mqttSubscribeMessage);
        for (MqttTopicSubscription mqttTopicSubscription : mqttSubscribeMessage.payload().topicSubscriptions()) {
            Topic topic = new Topic(mqttTopicSubscription.topicName());
            if (this.m_authorizator.canRead(topic, str2, sessionForClient.clientID)) {
                if (topic.isValid()) {
                    LOG.info("Client will be subscribed to the topic CId={}, username={}, messageId={}, topic={}", new Object[]{str, str2, Integer.valueOf(messageId), topic});
                    mqttQoS = mqttTopicSubscription.qualityOfService();
                } else {
                    LOG.error("Topic filter is not valid CId={}, username={}, messageId={}, topic={}", new Object[]{str, str2, Integer.valueOf(messageId), topic});
                    mqttQoS = MqttQoS.FAILURE;
                }
                arrayList.add(new MqttTopicSubscription(topic.toString(), mqttQoS));
            } else {
                LOG.error("Client does not have read permissions on the topic CId={}, username={}, messageId={}, topic={}", new Object[]{str, str2, Integer.valueOf(messageId), topic});
                arrayList.add(new MqttTopicSubscription(topic.toString(), MqttQoS.FAILURE));
            }
        }
        return arrayList;
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().qualityOfService().value()));
        }
        return new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList));
    }

    private void publishRetainedMessagesInSession(Subscription subscription, String str) {
        LOG.info("Retrieving retained messages CId={}, topics={}", subscription.getClientId(), subscription.getTopicFilter());
        Collection<IMessagesStore.StoredMessage> searchMatching = this.m_messagesStore.searchMatching(topic -> {
            return topic.match(subscription.getTopicFilter());
        });
        if (!searchMatching.isEmpty()) {
            LOG.info("Publishing retained messages CId={}, topics={}, messagesNo={}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), Integer.valueOf(searchMatching.size())});
        }
        this.internalRepublisher.publishRetained(this.m_sessionsStore.sessionForClient(subscription.getClientId()), searchMatching);
        this.m_interceptor.notifyTopicSubscribed(subscription, str);
    }

    public void notifyChannelWritable(Channel channel) {
        ClientSession sessionForClient = this.m_sessionsStore.sessionForClient(NettyUtils.clientID(channel));
        boolean z = false;
        while (channel.isWritable() && !z) {
            IMessagesStore.StoredMessage poll = sessionForClient.queue().poll();
            if (poll == null) {
                z = true;
            } else {
                channel.write(InternalRepublisher.createPublishForQos(poll.getTopic(), poll.getQos(), poll.getPayload(), poll.isRetained(), 0));
            }
        }
        channel.flush();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        this.m_interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        this.m_interceptor.removeInterceptHandler(interceptHandler);
    }

    public IMessagesStore getMessagesStore() {
        return this.m_messagesStore;
    }

    public ISessionsStore getSessionsStore() {
        return this.m_sessionsStore;
    }
}
