MqttConsumerClient.java
2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package four;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MqttConsumerClient {
// Kafka消费者客户端
private final KafkaConsumer<String, String> kafkaConsumer;
// Netty MQTT客户端
private final MqttAsyncClient mqttClient;
public MqttConsumerClient(String clientId, String brokerUrl, String topic) throws MqttException {
// 初始化Kafka消费者客户端
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("group.id", clientId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
// 初始化Netty MQTT客户端
EventLoopGroup group = new NioEventLoopGroup();
this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
this.mqttClient.setCallback(new MqttConsumerCallback());
this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
this.mqttClient.subscribe(topic, 1).waitForCompletion();
}
public void consume() throws MqttException {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String payload = record.value();
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
mqttClient.publish("topic", message).waitForCompletion();
}
}
public void close() throws MqttException {
kafkaConsumer.close();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
}
}