package io.moquette.server;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import io.moquette.BrokerConstants;
import io.moquette.connections.IConnectionsManager;
import io.moquette.interception.HazelcastInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.logging.LoggingUtils;
import io.moquette.server.config.FileResourceLoader;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import io.moquette.server.config.ResourceLoaderConfig;
import io.moquette.server.netty.NettyAcceptor;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.ProtocolProcessorBootstrapper;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.moquette.spi.security.ISslContextCreator;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/server/Server.class */
public class Server {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    private static final String HZ_INTERCEPT_HANDLER = HazelcastInterceptHandler.class.getCanonicalName();
    private ServerAcceptor m_acceptor;
    private volatile boolean m_initialized;
    private ProtocolProcessor m_processor;
    private HazelcastInstance hazelcastInstance;
    private ProtocolProcessorBootstrapper m_processorBootstrapper;
    private ScheduledExecutorService scheduler;

    public static void main(String[] strArr) throws IOException {
        Server server = new Server();
        server.startServer();
        System.out.println("Server started, version 0.10");
        Runtime runtime = Runtime.getRuntime();
        server.getClass();
        runtime.addShutdownHook(new Thread(server::stopServer));
    }

    public void startServer() throws IOException {
        File defaultConfigFile = defaultConfigFile();
        LOG.info("Starting Moquette server. Configuration file path={}", defaultConfigFile.getAbsolutePath());
        startServer(new ResourceLoaderConfig(new FileResourceLoader(defaultConfigFile)));
    }

    private static File defaultConfigFile() {
        return new File(System.getProperty("moquette.path", null), IConfig.DEFAULT_CONFIG);
    }

    public void startServer(File file) throws IOException {
        LOG.info("Starting Moquette server. Configuration file path={}", file.getAbsolutePath());
        startServer(new ResourceLoaderConfig(new FileResourceLoader(file)));
    }

    public void startServer(Properties properties) throws IOException {
        LOG.info("Starting Moquette server using properties object");
        startServer(new MemoryConfig(properties));
    }

    public void startServer(IConfig iConfig) throws IOException {
        LOG.info("Starting Moquette server using IConfig instance...");
        startServer(iConfig, null);
    }

    public void startServer(IConfig iConfig, List<? extends InterceptHandler> list) throws IOException {
        LOG.info("Starting moquette server using IConfig instance and intercept handlers");
        startServer(iConfig, list, null, null, null);
    }

    public void startServer(IConfig iConfig, List<? extends InterceptHandler> list, ISslContextCreator iSslContextCreator, IAuthenticator iAuthenticator, IAuthorizator iAuthorizator) throws IOException {
        if (list == null) {
            list = Collections.emptyList();
        }
        LOG.info("Starting Moquette Server. MQTT message interceptors={}", LoggingUtils.getInterceptorIds(list));
        this.scheduler = Executors.newScheduledThreadPool(1);
        String property = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
        if (property != null) {
            iConfig.setProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME, property);
        }
        configureCluster(iConfig);
        LOG.info("Configuring Using persistent store file, path={}", iConfig.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME));
        this.m_processorBootstrapper = new ProtocolProcessorBootstrapper();
        ProtocolProcessor init = this.m_processorBootstrapper.init(iConfig, list, iAuthenticator, iAuthorizator, this);
        LOG.info("Initialized MQTT protocol processor");
        if (iSslContextCreator == null) {
            LOG.warn("Using default SSL context creator");
            iSslContextCreator = new DefaultMoquetteSslContextCreator(iConfig);
        }
        LOG.info("Binding server to the configured ports");
        this.m_acceptor = new NettyAcceptor();
        this.m_acceptor.initialize(init, iConfig, iSslContextCreator);
        this.m_processor = init;
        LOG.info("Moquette server has been initialized successfully");
        this.m_initialized = true;
    }

    private void configureCluster(IConfig iConfig) throws FileNotFoundException {
        LOG.info("Configuring embedded Hazelcast instance");
        String property = iConfig.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
        if (property == null || !HZ_INTERCEPT_HANDLER.equals(property)) {
            LOG.info("There are no Hazelcast intercept handlers. The server won't start a Hazelcast instance.");
            return;
        }
        String property2 = iConfig.getProperty(BrokerConstants.HAZELCAST_CONFIGURATION);
        if (property2 != null) {
            ClasspathXmlConfig classpathXmlConfig = getClass().getClassLoader().getResource(property2) != null ? new ClasspathXmlConfig(property2) : new FileSystemXmlConfig(property2);
            LOG.info("Starting Hazelcast instance. ConfigurationFile={}", classpathXmlConfig);
            this.hazelcastInstance = Hazelcast.newHazelcastInstance(classpathXmlConfig);
        } else {
            LOG.info("Starting Hazelcast instance with default configuration");
            this.hazelcastInstance = Hazelcast.newHazelcastInstance();
        }
        listenOnHazelCastMsg();
    }

    private void listenOnHazelCastMsg() {
        LOG.info("Subscribing to Hazelcast topic. TopicName={}", "moquette");
        getHazelcastInstance().getTopic("moquette").addMessageListener(new HazelcastListener(this));
    }

    public HazelcastInstance getHazelcastInstance() {
        return this.hazelcastInstance;
    }

    public void internalPublish(MqttPublishMessage mqttPublishMessage, String str) {
        int messageId = mqttPublishMessage.variableHeader().messageId();
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId={}, messageId={}", str, Integer.valueOf(messageId));
            throw new IllegalStateException("Can't publish on a server is not yet started");
        }
        LOG.debug("Publishing message. CId={}, messageId={}", str, Integer.valueOf(messageId));
        this.m_processor.internalPublish(mqttPublishMessage, str);
    }

    public void stopServer() {
        LOG.info("Unbinding server from the configured ports");
        this.m_acceptor.close();
        LOG.trace("Stopping MQTT protocol processor");
        this.m_processorBootstrapper.shutdown();
        this.m_initialized = false;
        if (this.hazelcastInstance != null) {
            LOG.trace("Stopping embedded Hazelcast instance");
            try {
                this.hazelcastInstance.shutdown();
            } catch (HazelcastInstanceNotActiveException e) {
                LOG.warn("embedded Hazelcast instance is already shut down.");
            }
        }
        this.scheduler.shutdown();
        LOG.info("Moquette server has been stopped.");
    }

    public List<Subscription> getSubscriptions() {
        if (this.m_processorBootstrapper == null) {
            return null;
        }
        return this.m_processorBootstrapper.getSubscriptions();
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be added. InterceptorId={}", interceptHandler.getID());
            throw new IllegalStateException("Can't register interceptors on a server that is not yet started");
        }
        LOG.info("Adding MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.m_processor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        if (!this.m_initialized) {
            LOG.error("Moquette is not started, MQTT message interceptor cannot be removed. InterceptorId={}", interceptHandler.getID());
            throw new IllegalStateException("Can't deregister interceptors from a server that is not yet started");
        }
        LOG.info("Removing MQTT message interceptor. InterceptorId={}", interceptHandler.getID());
        this.m_processor.removeInterceptHandler(interceptHandler);
    }

    public IConnectionsManager getConnectionsManager() {
        return this.m_processorBootstrapper.getConnectionDescriptors();
    }

    public ProtocolProcessor getProcessor() {
        return this.m_processor;
    }

    public ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }
}
