MQTT 简介
- MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,它基于发布 / 订阅模式,广泛应用于物联网等场景,用于设备之间的通信。在 Java 中使用 MQTT,需要引入相应的 MQTT 客户端库,如 Eclipse Paho MQTT。
引入依赖
Maven 项目:如果是 Maven 构建的项目,在
pom.xml文件中添加以下依赖:1 2 3 4 5<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>mqttv5-client</artifactId> <version>1.2.5</version> </dependency>Gradle项目:对于Gradle构建的项目,在
build.gradle文件中添加以下依赖:1 2//mqttv3,目前最新为mqttv5 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
创建 MQTT 客户端并连接到服务器
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.publish.MqttPublishOptions; import org.eclipse.paho.mqttv5.common.MqttException; import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; import java.util.ArrayList; import java.util.List; public class MqttExample { public static void main(String[] args) { try { // 创建MQTT客户端实例 MqttClient client = new MqttClient("tcp://mqtt.example.com:1883", MqttClient.generateClientId()); // 配置连接选项 MqttConnectionOptions options = new MqttConnectionOptions(); options.setUserName("your_username"); options.setPassword("your_password".toCharArray()); // 连接到MQTT服务器 IMqttToken token = client.connect(options); token.waitForCompletion(); System.out.println("已连接到MQTT服务器"); // 在这里进行发布和订阅操作 // ...... // 断开连接 client.disconnect(); System.out.println("已从MQTT服务器断开连接"); } catch (MqttException e) { e.printStackTrace(); } } }代码解释:
- 首先,使用
MqttClient类创建一个MQTT客户端实例。构造函数的第一个参数是MQTT服务器的地址(这里是tcp://mqtt.example.com:1883),第二个参数是客户端ID,通过MqttClient.generateClientId()方法生成一个唯一的客户端ID。 - 然后,创建
MqttConnectionOptions对象来配置连接选项,如设置用户名和密码。 - 接着,使用客户端的
connect方法并传入连接选项来连接到MQTT服务器,连接操作返回一个IMqttToken对象,通过waitForCompletion方法等待连接完成。 - 在成功连接后,可以在注释部分进行发布和订阅操作。最后,使用
disconnect方法断开与服务器的连接。
- 首先,使用
订阅主题(Subscribe)
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16// 订阅主题 List<MqttSubscription> subscriptions = new ArrayList<>(); subscriptions.add(new MqttSubscription("topic1", 1)); client.subscribe(subscriptions); client.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("收到主题 " + topic + " 的消息: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttToken token) { } @Override public void connectionLost(Throwable cause) { } });代码解释:
- 首先,创建一个
MqttSubscription对象的列表,每个MqttSubscription对象包含要订阅的主题名称(如topic1)和 QoS(Quality of Service,服务质量)级别(这里是 1)。QoS 级别有 0、1、2 三种,0 表示最多一次传递,1 表示至少一次传递,2 表示恰好一次传递。 - 然后,使用客户端的
subscribe方法订阅主题列表。接着,通过setCallback方法设置一个回调对象,当有消息到达订阅的主题时,messageArrived方法会被调用,在这个方法中可以处理接收到的消息内容,如打印消息主题和消息内容。
- 首先,创建一个
发布消息(Publish)
示例代码:
1 2 3 4 5// 发布消息 MqttMessage message = new MqttMessage("Hello, MQTT!".getBytes()); MqttPublishOptions publishOptions = new MqttPublishOptions(); publishOptions.setQos(1); client.publish("topic1", message, publishOptions);代码解释:
- 首先,创建一个
MqttMessage对象,将消息内容转换为字节数组作为构造函数的参数(这里消息内容是Hello, MQTT!)。 - 然后,创建
MqttPublishOptions对象来配置发布选项,如设置QoS级别(这里是1)。 - 最后,使用客户端的
publish方法将消息发布到指定的主题(这里是topic1),并传入消息对象和发布选项。
- 首先,创建一个
Windows中MQTT软件命令
在windows系统中可以安装 mosquitto 软件来通过命令行订阅和发布MQTT消息,下面是部分常用功能。
订阅主题(mosquitto_sub)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98Usage: mosquitto_sub {[-h host] [--unix path] [-p port] [-u username] [-P password] -t topic | -L URL [-t topic]} [-c] [-k keepalive] [-q qos] [-x session-expiry-interval] [-C msg_count] [-E] [-R] [--retained-only] [--remove-retained] [-T filter_out] [-U topic ...] [-F format] [-A bind_address] [--nodelay] [-i id] [-I id_prefix] [-d] [-N] [--quiet] [-v] [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]] [{--cafile file | --capath dir} [--cert file] [--key file] [--ciphers ciphers] [--insecure] [--tls-alpn protocol] [--tls-engine engine] [--keyform keyform] [--tls-engine-kpass-sha1]] [--tls-use-os-certs] [--psk hex-key --psk-identity identity [--ciphers ciphers]] [--proxy socks-url] [-D command identifier value] mosquitto_sub --help -A : bind the outgoing socket to this host/ip address. Use to control which interface the client communicates over. -c : disable clean session/enable persistent client mode When this argument is used, the broker will be instructed not to clean existing sessions for the same client id when the client connects, and sessions will never expire when the client disconnects. MQTT v5 clients can change their session expiry interval with the -x argument. -C : disconnect and exit after receiving the 'msg_count' messages. -d : enable debug messages. -D : Define MQTT v5 properties. See the documentation for more details. -E : Exit once all subscriptions have been acknowledged by the broker. -F : output format. -h : mqtt host to connect to. Defaults to localhost. -i : id to use for this client. Defaults to mosquitto_sub_ appended with the process id. -I : define the client id as id_prefix appended with the process id. Useful for when the broker is using the clientid_prefixes option. -k : keep alive in seconds for this client. Defaults to 60. -L : specify user, password, hostname, port and topic as a URL in the form: mqtt(s)://[username[:password]@]host[:port]/topic -N : do not add an end of line character when printing the payload. -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS. -P : provide a password -q : quality of service level to use for the subscription. Defaults to 0. -R : do not print stale messages (those with retain set). -t : mqtt topic to subscribe to. May be repeated multiple times. -T : topic string to filter out of results. May be repeated. -u : provide a username -U : unsubscribe from a topic. May be repeated. -v : print published messages verbosely. -V : specify the version of the MQTT protocol to use when connecting. Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311. -x : Set the session-expiry-interval property on the CONNECT packet. Applies to MQTT v5 clients only. Set to 0-4294967294 to specify the session will expire in that many seconds after the client disconnects, or use -1, 4294967295, or 鈭?for a session that does not expire. Defaults to -1 if -c is also given, or 0 if -c not given. --help : display this message. --nodelay : disable Nagle's algorithm, to reduce socket sending latency at the possible expense of more packets being sent. --pretty : print formatted output rather than minimised output when using the JSON output format option. --quiet : don't print error messages. --random-filter : only print a percentage of received messages. Set to 100 to have all messages printed, 50.0 to have half of the messages received on average printed, and so on. --retained-only : only handle messages with the retained flag set, and exit when the first non-retained message is received. --remove-retained : send a message to the server to clear any received retained messages Use -T to filter out messages you do not want to be cleared. --unix : connect to a broker through a unix domain socket instead of a TCP socket, e.g. /tmp/mosquitto.sock --will-payload : payload for the client Will, which is sent by the broker in case of unexpected disconnection. If not given and will-topic is set, a zero length message will be sent. --will-qos : QoS level for the client Will. --will-retain : if given, make the client Will retained. --will-topic : the topic on which to publish the client Will. --cafile : path to a file containing trusted CA certificates to enable encrypted certificate based communication. --capath : path to a directory containing trusted CA certificates to enable encrypted communication. --cert : client certificate for authentication, if required by server. --key : client private key for authentication, if required by server. --keyform : keyfile type, can be either "pem" or "engine". --ciphers : openssl compatible list of TLS ciphers to support. --tls-version : TLS protocol version, can be one of tlsv1.3 tlsv1.2 or tlsv1.1. Defaults to tlsv1.2 if available. --insecure : do not check that the server certificate hostname matches the remote hostname. Using this option means that you cannot be sure that the remote host is the server you wish to connect to and so is insecure. Do not use this option in a production environment. --tls-engine : If set, enables the use of a SSL engine device. --tls-engine-kpass-sha1 : SHA1 of the key password to be used with the selected SSL engine. --tls-use-os-certs : Load and trust OS provided CA certificates. --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode. --psk-identity : client identity string for TLS-PSK mode. --proxy : SOCKS5 proxy URL of the form: socks5h://[username[:password]@]hostname[:port] Only "none" and "username" authentication is supported. See https://mosquitto.org/ for more information.发布主题消息(mosquitto_pub)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90Usage: mosquitto_pub {[-h host] [--unix path] [-p port] [-u username] [-P password] -t topic | -L URL} {-f file | -l | -n | -m message} [-c] [-k keepalive] [-q qos] [-r] [--repeat N] [--repeat-delay time] [-x session-expiry] [-A bind_address] [--nodelay] [-i id] [-I id_prefix] [-d] [--quiet] [-M max_inflight] [-u username [-P password]] [--will-topic [--will-payload payload] [--will-qos qos] [--will-retain]] [{--cafile file | --capath dir} [--cert file] [--key file] [--ciphers ciphers] [--insecure] [--tls-alpn protocol] [--tls-engine engine] [--keyform keyform] [--tls-engine-kpass-sha1]] [--tls-use-os-certs] [--psk hex-key --psk-identity identity [--ciphers ciphers]] [--proxy socks-url] [--property command identifier value] [-D command identifier value] mosquitto_pub --help -A : bind the outgoing socket to this host/ip address. Use to control which interface the client communicates over. -d : enable debug messages. -c : disable clean session/enable persistent client mode When this argument is used, the broker will be instructed not to clean existing sessions for the same client id when the client connects, and sessions will never expire when the client disconnects. MQTT v5 clients can change their session expiry interval with the -x argument. -D : Define MQTT v5 properties. See the documentation for more details. -f : send the contents of a file as the message. -h : mqtt host to connect to. Defaults to localhost. -i : id to use for this client. Defaults to mosquitto_pub_ appended with the process id. -I : define the client id as id_prefix appended with the process id. Useful for when the broker is using the clientid_prefixes option. -k : keep alive in seconds for this client. Defaults to 60. -L : specify user, password, hostname, port and topic as a URL in the form: mqtt(s)://[username[:password]@]host[:port]/topic -l : read messages from stdin, sending a separate message for each line. -m : message payload to send. -M : the maximum inflight messages for QoS 1/2.. -n : send a null (zero length) message. -p : network port to connect to. Defaults to 1883 for plain MQTT and 8883 for MQTT over TLS. -P : provide a password -q : quality of service level to use for all messages. Defaults to 0. -r : message should be retained. -s : read message from stdin, sending the entire input as a message. -t : mqtt topic to publish to. -u : provide a username -V : specify the version of the MQTT protocol to use when connecting. Can be mqttv5, mqttv311 or mqttv31. Defaults to mqttv311. -x : Set the session-expiry-interval property on the CONNECT packet. Applies to MQTT v5 clients only. Set to 0-4294967294 to specify the session will expire in that many seconds after the client disconnects, or use -1, 4294967295, or 鈭?for a session that does not expire. Defaults to -1 if -c is also given, or 0 if -c not given. --help : display this message. --nodelay : disable Nagle's algorithm, to reduce socket sending latency at the possible expense of more packets being sent. --quiet : don't print error messages. --repeat : if publish mode is -f, -m, or -s, then repeat the publish N times. --repeat-delay : if using --repeat, wait time seconds between publishes. Defaults to 0. --unix : connect to a broker through a unix domain socket instead of a TCP socket, e.g. /tmp/mosquitto.sock --will-payload : payload for the client Will, which is sent by the broker in case of unexpected disconnection. If not given and will-topic is set, a zero length message will be sent. --will-qos : QoS level for the client Will. --will-retain : if given, make the client Will retained. --will-topic : the topic on which to publish the client Will. --cafile : path to a file containing trusted CA certificates to enable encrypted communication. --capath : path to a directory containing trusted CA certificates to enable encrypted communication. --cert : client certificate for authentication, if required by server. --key : client private key for authentication, if required by server. --keyform : keyfile type, can be either "pem" or "engine". --ciphers : openssl compatible list of TLS ciphers to support. --tls-version : TLS protocol version, can be one of tlsv1.3 tlsv1.2 or tlsv1.1. Defaults to tlsv1.2 if available. --insecure : do not check that the server certificate hostname matches the remote hostname. Using this option means that you cannot be sure that the remote host is the server you wish to connect to and so is insecure. Do not use this option in a production environment. --tls-engine : If set, enables the use of a TLS engine device. --tls-engine-kpass-sha1 : SHA1 of the key password to be used with the selected SSL engine. --tls-use-os-certs : Load and trust OS provided CA certificates. --psk : pre-shared-key in hexadecimal (no leading 0x) to enable TLS-PSK mode. --psk-identity : client identity string for TLS-PSK mode. --proxy : SOCKS5 proxy URL of the form: socks5h://[username[:password]@]hostname[:port] Only "none" and "username" authentication is supported.
java中MQTT使用
💬评论
0
访问数: