1. MQTT 简介

    • MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,它基于发布 / 订阅模式,广泛应用于物联网等场景,用于设备之间的通信。在 Java 中使用 MQTT,需要引入相应的 MQTT 客户端库,如 Eclipse Paho MQTT。
  2. 引入依赖

    • Maven 项目:如果是 Maven 构建的项目,在pom.xml文件中添加以下依赖:

      1
      2
      3
      4
      5
      
      <dependency>
          <groupId>org.eclipse.paho</groupId>
          <artifactId>mqttv5-client</artifactId>
          <version>1.2.5</version>
      </dependency>
      
    • Gradle项目:对于Gradle构建的项目,在build.gradle文件中添加以下依赖:

      1
      2
      
      //mqttv3,目前最新为mqttv5
      implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
      
  3. 创建 MQTT 客户端并连接到服务器

    • 示例代码

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      
          import org.eclipse.paho.mqttv5.client.IMqttToken;
          import org.eclipse.paho.mqttv5.client.MqttClient;
          import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
          import org.eclipse.paho.mqttv5.client.publish.MqttPublishOptions;
          import org.eclipse.paho.mqttv5.common.MqttException;
          import org.eclipse.paho.mqttv5.common.MqttMessage;
          import org.eclipse.paho.mqttv5.common.MqttSubscription;
          import java.util.ArrayList;
          import java.util.List;
          public class MqttExample {
              public static void main(String[] args) {
                  try {
                      // 创建MQTT客户端实例
                      MqttClient client = new MqttClient("tcp://mqtt.example.com:1883", MqttClient.generateClientId());
                      // 配置连接选项
                      MqttConnectionOptions options = new MqttConnectionOptions();
                      options.setUserName("your_username");
                      options.setPassword("your_password".toCharArray());
                      // 连接到MQTT服务器
                      IMqttToken token = client.connect(options);
                      token.waitForCompletion();
                      System.out.println("已连接到MQTT服务器");
                      // 在这里进行发布和订阅操作
                      // ......
                      // 断开连接
                      client.disconnect();
                      System.out.println("已从MQTT服务器断开连接");
                  } catch (MqttException e) {
                      e.printStackTrace();
                  }
              }
          }
      
    • 代码解释

      • 首先,使用MqttClient类创建一个MQTT客户端实例。构造函数的第一个参数是MQTT服务器的地址(这里是tcp://mqtt.example.com:1883),第二个参数是客户端ID,通过MqttClient.generateClientId()方法生成一个唯一的客户端ID。
      • 然后,创建MqttConnectionOptions对象来配置连接选项,如设置用户名和密码。
      • 接着,使用客户端的connect方法并传入连接选项来连接到MQTT服务器,连接操作返回一个IMqttToken对象,通过waitForCompletion方法等待连接完成。
      • 在成功连接后,可以在注释部分进行发布和订阅操作。最后,使用disconnect方法断开与服务器的连接。
  4. 订阅主题(Subscribe)

    • 示例代码

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      
       // 订阅主题
       List<MqttSubscription> subscriptions = new ArrayList<>();
       subscriptions.add(new MqttSubscription("topic1", 1));
       client.subscribe(subscriptions);
       client.setCallback(new MqttCallback() {
           @Override
           public void messageArrived(String topic, MqttMessage message) throws Exception {
               System.out.println("收到主题 " + topic + " 的消息: " + new String(message.getPayload()));
           }
           @Override
           public void deliveryComplete(IMqttToken token) {
           }
           @Override
           public void connectionLost(Throwable cause) {
           }
       });
      
    • 代码解释

      • 首先,创建一个MqttSubscription对象的列表,每个MqttSubscription对象包含要订阅的主题名称(如topic1)和 QoS(Quality of Service,服务质量)级别(这里是 1)。QoS 级别有 0、1、2 三种,0 表示最多一次传递,1 表示至少一次传递,2 表示恰好一次传递。
      • 然后,使用客户端的subscribe方法订阅主题列表。接着,通过setCallback方法设置一个回调对象,当有消息到达订阅的主题时,messageArrived方法会被调用,在这个方法中可以处理接收到的消息内容,如打印消息主题和消息内容。
  5. 发布消息(Publish)

    • 示例代码

      1
      2
      3
      4
      5
      
      // 发布消息
      MqttMessage message = new MqttMessage("Hello, MQTT!".getBytes());
      MqttPublishOptions publishOptions = new MqttPublishOptions();
      publishOptions.setQos(1);
      client.publish("topic1", message, publishOptions);
      
    • 代码解释

      • 首先,创建一个MqttMessage对象,将消息内容转换为字节数组作为构造函数的参数(这里消息内容是Hello, MQTT!)。
      • 然后,创建MqttPublishOptions对象来配置发布选项,如设置QoS级别(这里是1)。
      • 最后,使用客户端的publish方法将消息发布到指定的主题(这里是topic1),并传入消息对象和发布选项。
  6. Windows中MQTT软件命令

    在windows系统中可以安装 mosquitto 软件来通过命令行订阅和发布MQTT消息,下面是部分常用功能。

    • 订阅主题(mosquitto_sub)

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      
      Usage: mosquitto_sub {[-h host] [--unix path] [-p port] [-u username] [-P password] -t topic | -L URL [-t topic]}
                           [-c] [-k keepalive] [-q qos] [-x session-expiry-interval]
                           [-C msg_count] [-E] [-R] [--retained-only] [--remove-retained] [-T filter_out] [-U topic ...]
                           [-F format]
                           [-A bind_address] [--nodelay]
                           [-i id] [-I id_prefix]
                           [-d] [-N] [--quiet] [-v]
                           [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]
                           [{--cafile file | --capath dir} [--cert file] [--key file]
                             [--ciphers ciphers] [--insecure]
                             [--tls-alpn protocol]
                             [--tls-engine engine] [--keyform keyform] [--tls-engine-kpass-sha1]]
                             [--tls-use-os-certs]
                           [--psk hex-key --psk-identity identity [--ciphers ciphers]]
                           [--proxy socks-url]
                           [-D command identifier value]
             mosquitto_sub --help
      
       -A : bind the outgoing socket to this host/ip address. Use to control which interface
            the client communicates over.
       -c : disable clean session/enable persistent client mode
            When this argument is used, the broker will be instructed not to clean existing sessions
            for the same client id when the client connects, and sessions will never expire when the
            client disconnects. MQTT v5 clients can change their session expiry interval with the -x
            argument.
       -C : disconnect and exit after receiving the 'msg_count' messages.
       -d : enable debug messages.
       -D : Define MQTT v5 properties. See the documentation for more details.
       -E : Exit once all subscriptions have been acknowledged by the broker.
       -F : output format.
       -h : mqtt host to connect to. Defaults to localhost.
       -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id.
       -I : define the client id as id_prefix appended with the process id. Useful for when the
            broker is using the clientid_prefixes option.
       -k : keep alive in seconds for this client. Defaults to 60.
       -L : specify user, password, hostname, port and topic as a URL in the form:
            mqtt(s)://[username[:password]@]host[:port]/topic
       -N : do not add an end of line character when printing the payload.
       -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.
       -P : provide a password
       -q : quality of service level to use for the subscription. Defaults to 0.
       -R : do not print stale messages (those with retain set).
       -t : mqtt topic to subscribe to. May be repeated multiple times.
       -T : topic string to filter out of results. May be repeated.
       -u : provide a username
       -U : unsubscribe from a topic. May be repeated.
       -v : print published messages verbosely.
       -V : specify the version of the MQTT protocol to use when connecting.
            Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311.
       -x : Set the session-expiry-interval property on the CONNECT packet. Applies to MQTT v5
            clients only. Set to 0-4294967294 to specify the session will expire in that many
            seconds after the client disconnects, or use -1, 4294967295, or 鈭?for a session
            that does not expire. Defaults to -1 if -c is also given, or 0 if -c not given.
       --help : display this message.
       --nodelay : disable Nagle's algorithm, to reduce socket sending latency at the possible
                   expense of more packets being sent.
       --pretty : print formatted output rather than minimised output when using the
                  JSON output format option.
       --quiet : don't print error messages.
       --random-filter : only print a percentage of received messages. Set to 100 to have all
                         messages printed, 50.0 to have half of the messages received on average
                         printed, and so on.
       --retained-only : only handle messages with the retained flag set, and exit when the
                         first non-retained message is received.
       --remove-retained : send a message to the server to clear any received retained messages
                           Use -T to filter out messages you do not want to be cleared.
       --unix : connect to a broker through a unix domain socket instead of a TCP socket,
                e.g. /tmp/mosquitto.sock
       --will-payload : payload for the client Will, which is sent by the broker in case of
                        unexpected disconnection. If not given and will-topic is set, a zero
                        length message will be sent.
       --will-qos : QoS level for the client Will.
       --will-retain : if given, make the client Will retained.
       --will-topic : the topic on which to publish the client Will.
       --cafile : path to a file containing trusted CA certificates to enable encrypted
                  certificate based communication.
       --capath : path to a directory containing trusted CA certificates to enable encrypted
                  communication.
       --cert : client certificate for authentication, if required by server.
       --key : client private key for authentication, if required by server.
       --keyform : keyfile type, can be either "pem" or "engine".
       --ciphers : openssl compatible list of TLS ciphers to support.
       --tls-version : TLS protocol version, can be one of tlsv1.3 tlsv1.2 or tlsv1.1.
                       Defaults to tlsv1.2 if available.
       --insecure : do not check that the server certificate hostname matches the remote
                    hostname. Using this option means that you cannot be sure that the
                    remote host is the server you wish to connect to and so is insecure.
                    Do not use this option in a production environment.
       --tls-engine : If set, enables the use of a SSL engine device.
       --tls-engine-kpass-sha1 : SHA1 of the key password to be used with the selected SSL engine.
       --tls-use-os-certs : Load and trust OS provided CA certificates.
       --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.
       --psk-identity : client identity string for TLS-PSK mode.
       --proxy : SOCKS5 proxy URL of the form:
                 socks5h://[username[:password]@]hostname[:port]
                 Only "none" and "username" authentication is supported.
      
      See https://mosquitto.org/ for more information.
      
    • 发布主题消息(mosquitto_pub)

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      
      Usage: mosquitto_pub {[-h host] [--unix path] [-p port] [-u username] [-P password] -t topic | -L URL}
                           {-f file | -l | -n | -m message}
                           [-c] [-k keepalive] [-q qos] [-r] [--repeat N] [--repeat-delay time] [-x session-expiry]
                           [-A bind_address] [--nodelay]
                           [-i id] [-I id_prefix]
                           [-d] [--quiet]
                           [-M max_inflight]
                           [-u username [-P password]]
                           [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]]
                           [{--cafile file | --capath dir} [--cert file] [--key file]
                             [--ciphers ciphers] [--insecure]
                             [--tls-alpn protocol]
                             [--tls-engine engine] [--keyform keyform] [--tls-engine-kpass-sha1]]
                             [--tls-use-os-certs]
                           [--psk hex-key --psk-identity identity [--ciphers ciphers]]
                           [--proxy socks-url]
                           [--property command identifier value]
                           [-D command identifier value]
             mosquitto_pub --help
      
       -A : bind the outgoing socket to this host/ip address. Use to control which interface
            the client communicates over.
       -d : enable debug messages.
       -c : disable clean session/enable persistent client mode
            When this argument is used, the broker will be instructed not to clean existing sessions
            for the same client id when the client connects, and sessions will never expire when the
            client disconnects. MQTT v5 clients can change their session expiry interval with the -x
            argument.
       -D : Define MQTT v5 properties. See the documentation for more details.
       -f : send the contents of a file as the message.
       -h : mqtt host to connect to. Defaults to localhost.
       -i : id to use for this client. Defaults to mosquitto_pub_ appended with the process id.
       -I : define the client id as id_prefix appended with the process id. Useful for when the
            broker is using the clientid_prefixes option.
       -k : keep alive in seconds for this client. Defaults to 60.
       -L : specify user, password, hostname, port and topic as a URL in the form:
            mqtt(s)://[username[:password]@]host[:port]/topic
       -l : read messages from stdin, sending a separate message for each line.
       -m : message payload to send.
       -M : the maximum inflight messages for QoS 1/2..
       -n : send a null (zero length) message.
       -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS.
       -P : provide a password
       -q : quality of service level to use for all messages. Defaults to 0.
       -r : message should be retained.
       -s : read message from stdin, sending the entire input as a message.
       -t : mqtt topic to publish to.
       -u : provide a username
       -V : specify the version of the MQTT protocol to use when connecting.
            Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311.
       -x : Set the session-expiry-interval property on the CONNECT packet. Applies to MQTT v5
            clients only. Set to 0-4294967294 to specify the session will expire in that many
            seconds after the client disconnects, or use -1, 4294967295, or 鈭?for a session
            that does not expire. Defaults to -1 if -c is also given, or 0 if -c not given.
       --help : display this message.
       --nodelay : disable Nagle's algorithm, to reduce socket sending latency at the possible
                   expense of more packets being sent.
       --quiet : don't print error messages.
       --repeat : if publish mode is -f, -m, or -s, then repeat the publish N times.
       --repeat-delay : if using --repeat, wait time seconds between publishes. Defaults to 0.
       --unix : connect to a broker through a unix domain socket instead of a TCP socket,
                e.g. /tmp/mosquitto.sock
       --will-payload : payload for the client Will, which is sent by the broker in case of
                        unexpected disconnection. If not given and will-topic is set, a zero
                        length message will be sent.
       --will-qos : QoS level for the client Will.
       --will-retain : if given, make the client Will retained.
       --will-topic : the topic on which to publish the client Will.
       --cafile : path to a file containing trusted CA certificates to enable encrypted
                  communication.
       --capath : path to a directory containing trusted CA certificates to enable encrypted
                  communication.
       --cert : client certificate for authentication, if required by server.
       --key : client private key for authentication, if required by server.
       --keyform : keyfile type, can be either "pem" or "engine".
       --ciphers : openssl compatible list of TLS ciphers to support.
       --tls-version : TLS protocol version, can be one of tlsv1.3 tlsv1.2 or tlsv1.1.
                       Defaults to tlsv1.2 if available.
       --insecure : do not check that the server certificate hostname matches the remote
                    hostname. Using this option means that you cannot be sure that the
                    remote host is the server you wish to connect to and so is insecure.
                    Do not use this option in a production environment.
       --tls-engine : If set, enables the use of a TLS engine device.
       --tls-engine-kpass-sha1 : SHA1 of the key password to be used with the selected SSL engine.
       --tls-use-os-certs : Load and trust OS provided CA certificates.
       --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode.
       --psk-identity : client identity string for TLS-PSK mode.
       --proxy : SOCKS5 proxy URL of the form:
                 socks5h://[username[:password]@]hostname[:port]
                 Only "none" and "username" authentication is supported.