MqttProducerClient.java
1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package four;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Properties;
public class MqttProducerClient {
// Kafka生产者客户端
private final KafkaProducer<String, String> kafkaProducer;
// Netty MQTT客户端
private final MqttAsyncClient mqttClient;
public MqttProducerClient(String clientId, String brokerUrl, String topic) throws MqttException {
// 初始化Kafka生产者客户端
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.kafkaProducer = new KafkaProducer<>(props);
// 初始化Netty MQTT客户端
EventLoopGroup group = new NioEventLoopGroup();
this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
this.mqttClient.setCallback(new MqttProducerCallback(kafkaProducer, topic));
this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
this.mqttClient.subscribe(topic, 1).waitForCompletion();
}
public void publish(String payload) throws MqttException {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
mqttClient.publish("topic", message).waitForCompletion();
}
public void close() throws MqttException {
kafkaProducer.close();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
}
}