原创

emqx配置ssl/tsl实现双向认证

温馨提示:
本文最后更新于 2023年03月08日,已超过 387 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

    emqx是一种基于mqtt协议实现的消息框架,目前很多地方已经开始使用。简单的emqx安装配置,其实就可以作为一个broker来使用,客户端只需要通过ip、端口、用户名、密码、clientid就可以连接了,至于发送消息,直接在发送的时候指定topic即可。

    简单的emqx安装配置,使用的协议是:mqtt:tcp,使用的url是tcp://ip:1883。这种方式,其实可以很容易被模拟(理论上的可能),所以有了mqtts:tls协议,一般使用ssl协议来实现。这也是本文所要阐述的问题。

    我在配置ssl证书的时候,服务启动没问题,最后客户端连接,死活连接失败。最后看了官方的文档,按照那个文档很容易就配置成功,并且客户端连接也成功了。

    其实我大概知道我配置失败的原因,可能是因为证书中的一个跟主机ip相关的设置问题,但是这个问题我按照一些博客的提示做了修改,但是后来还是没有成功,很郁闷,最后按照官方配置,很快设置好了。思路基本一模一样,而且配置证书的步骤也都类似,先生成私钥,然后生成证书请求,最后根据根证书生成服务端证书和客户端证书。

    emqx的下载安装这里不做过多的说明,在官网直接下载,解压然后运行bin/emqx start就可以了。

   emqx安装目录下有一个etc/certs目录,它自带了一些根证书、服务端证书、客户端证书,但是为了保险起见,最好自己根据自己的主机地址生成。

   下面基本是按照emqx官方的一篇博客来配置的,就是主机地址做了修改。

    以下的证书配置,是自签名证书,不是花钱购买的那种服务器证书。

    1、生成根证书私钥

openssl genrsa -out ca.key 2048

    2、生成根证书

openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.pem

    这一步会有交互提示,需要输入省份、城市、公司、组织、姓名、邮件,随便填写就可以,这里的信息不会影响根证书使用。 

    3、生成服务端证书私钥

openssl genrsa -out emqx.key 2048

     4、生成服务端证书请求

    按照官方博客的意思,这里需要手动创建一个openssl.cnf的文件,其实主要就是设置IP地址

[req]
default_bits  = 2048
distinguished_name = req_distinguished_name
req_extensions = req_ext
x509_extensions = v3_req
prompt = no
[req_distinguished_name]
countryName = CN
stateOrProvinceName = Hubei
localityName = Wuhan
organizationName = EMQX
commonName = CA
[req_ext]
subjectAltName = @alt_names
[v3_req]
subjectAltName = @alt_names
[alt_names]
IP.1 = 192.168.226.100
DNS.1 = 192.168.226.100

    修改emqx服务器的ip和dns

    执行如下命令生成服务端证书请求:

openssl req -new -key ./emqx.key -config openssl.cnf -out emqx.csr

     5、生成服务端证书

openssl x509 -req -in ./emqx.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out emqx.pem -days 3650 -sha256 -extensions v3_req -extfile openssl.cnf

     6、生成客户端证书私钥

openssl genrsa -out client.key 2048

     7、生成客户端证书请求

openssl req -new -key client.key -out client.csr -subj "/C=CN/ST=Zhejiang/L=Hangzhou/O=EMQX/CN=client"

      8、生成客户端证书

openssl x509 -req -days 3650 -in client.csr -CA ca.pem -CAkey ca.key -CAcreateserial -out client.pem

一顿操作之后certs目录下生成的文件:

 

以上,只是生成了相关证书,在emqx的配置文件etc/emqx.conf中还需要做相关设置:

listener.ssl.external.keyfile = etc/certs/emqx.key
listener.ssl.external.certfile = etc/certs/emqx.pem
listener.ssl.external.cacertfile = etc/certs/ca.pem
listener.ssl.external.verify = verify_peer
listener.ssl.external.fail_if_no_peer_cert = true

    这几个主要的配置在原来的emqx.conf中都有,只需要打开或者修改配置值即可,主要是证书的名字。

    重启emqx服务,验证。bin/emqx restart

     这里使用mqttbox来验证,其实mqtt.x也是可以的,客户端需要的文件:ca.pem,client.pem,client.key 

     普通的mqtt/tcp协议连接配置:

    mqtts/tls协议连接配置:

 

    配置需要注意的地方在上图中已经标识出来了,协议类型:mqtts/tls ,证书类型:自签名证书,ca文件:ca.pem,客户端证书:client.pem,客户端私钥:client.key。

     配置完成,保存,可以看到连接状态变为绿色,如果是红色,那么就需要检查证书是否配置正确,包括生成和在emqx.conf配置文件中的设置。

     发送消息测试:

  下面通过java代码来验证,这里使用springboot+spring-integration-mqtt来实现。

  maven工程的pom.xml

<dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.springframework.integration</groupId>
    	<artifactId>spring-integration-mqtt</artifactId>
    </dependency>
    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    	<version>1.2.5</version>
    </dependency>
    
    <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcpkix-jdk15on</artifactId>
            <version>1.47</version>
     </dependency>

    application-dev.yml

mqtt:
  serverURIs: ssl://192.168.226.100:8883
  username: admin
  password: public
  client:
    id: ${random.value}
  topic: test

     MqttConfig.java

package com.huali.mec.receiver.config;
import java.util.Objects;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import com.huali.mec.receiver.util.SslUtil;
@IntegrationComponentScan
@Configuration
public class MqttConfig {
	public static final Logger log  = LoggerFactory.getLogger(MqttConfig.class);
	
	public static final String OUTPUT_CHANNEL = "mqttOutputChannel";
	public static final String INPUT_CHANNEL = "mqttInputChannel";
	
	@Value("${mqtt.username}")
	private String username ;
	@Value("${mqtt.password}")
	private String password ;
	@Value("${mqtt.serverURIs}")
	private String serverURIs ;
	@Value("${mqtt.client.id}")
	private String clientId;
	@Value("${mqtt.topic}")
	private String defaultTopic ;
	
	@Bean
	public MqttPahoClientFactory clientFactory() {
		final MqttConnectOptions options = new MqttConnectOptions();
		options.setServerURIs(new String[] {serverURIs});
		options.setUserName(username);
		options.setPassword(password.toCharArray());
		options.setKeepAliveInterval(10);
		options.setAutomaticReconnect(true);
		try {
			options.setSocketFactory(SslUtil.getSocketFactory("src/main/resources/ssl/ca.pem", "src/main/resources/ssl/client.pem", "src/main/resources/ssl/client.key", password));
		} catch (Exception e) {
			e.printStackTrace();
		}
		final DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
		factory.setConnectionOptions(options);
		return factory;
	}
	
	@Bean(value = OUTPUT_CHANNEL)
	public MessageChannel mqttOutChannel() {
		return new DirectChannel();
	}
	
	@Bean
	@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
	public MessageHandler mqttOutbound() {
		final MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, clientFactory());
		handler.setDefaultQos(1);
		handler.setDefaultRetained(false);
		handler.setDefaultTopic(defaultTopic);
		handler.setAsync(false);
		handler.setAsyncEvents(false);
		return handler;
	}
	
	
	@Bean
	public MessageChannel mqttInputChannel() {
		return new DirectChannel();
	}
	
	@Bean
	public MessageProducer inbound() {
		MqttPahoMessageDrivenChannelAdapter adapter = 
				new MqttPahoMessageDrivenChannelAdapter(
				clientId+"_inbound", clientFactory(), defaultTopic);
		adapter.setCompletionTimeout(3000);
		adapter.setConverter(new DefaultPahoMessageConverter());
		adapter.setQos(1);
		adapter.setOutputChannel(mqttInputChannel());
		return adapter;
	}
	
	@Autowired
	private ApplicationEventPublisher eventPublisher ;
	
	@Bean
	@ServiceActivator(inputChannel = INPUT_CHANNEL)
	public MessageHandler handler() {
		return message -> {
			String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
			log.info("topic :{} ,payload :{}",topic,message.getPayload().toString());
			//eventPublisher.publishEvent(new MqttEvent(this, topic, message.getPayload().toString()));
		};
	}
	
}

    SslUtil.java,读取客户端证书并生成SSLSocketFactory给MQTT连接使用

package com.huali.mec.receiver.util;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyPair;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMReader;
import org.bouncycastle.openssl.PasswordFinder;

public class SslUtil {
	public static SSLSocketFactory getSocketFactory(final String caCrtFile,final String clientCrtFile,
			final String keyFile,final String password) throws Exception {
		Security.addProvider(new BouncyCastleProvider());
		//load ca certificate
		PEMReader reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
		X509Certificate caCert = (X509Certificate)reader.readObject();
		reader.close();
		
		//load client certificate
		reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(clientCrtFile)))));
		X509Certificate clientCert = (X509Certificate)reader.readObject();
		reader.close();
		//load client key
		reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
				new PasswordFinder() {
					
					@Override
					public char[] getPassword() {
						return password.toCharArray();
					}
		});
		
		KeyPair key = (KeyPair)reader.readObject();
		
		reader.close();
		
		KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
		
		keyStore.load(null, null);
		keyStore.setCertificateEntry("ca-certificate", caCert);
		TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
		tmf.init(keyStore);
		
		KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
		ks.load(null, null);
		ks.setCertificateEntry("certificate", clientCert);
		ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),new java.security.cert.Certificate[] {clientCert});
		KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
		kmf.init(ks,password.toCharArray());
		
		SSLContext context = SSLContext.getInstance("TLSv1.2");
		context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
		return context.getSocketFactory();
		
	}
}

客户端使用的三个证书文件放到resources/ssl目录下,ca.pem,client.pem,client.key

 

 

     启动springboot项目,mqttbox客户端发送数据到test这个topic上,这边监听:

     控制台打印的消息:

2022-01-21 11:46:12.019  INFO 18368 --- [7785b5b_inbound] c.huali.mec.receiver.config.MqttConfig   : topic :test ,payload :{"id":1,"name":"buejee"}

正文到此结束
本文目录