MqttConsumerClient.java 2.4 KB
package four;


import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class MqttConsumerClient {
    // Kafka消费者客户端
    private final KafkaConsumer<String, String> kafkaConsumer;
    // Netty MQTT客户端
    private final MqttAsyncClient mqttClient;

    public MqttConsumerClient(String clientId, String brokerUrl, String topic) throws MqttException {
        // 初始化Kafka消费者客户端
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerUrl);
        props.put("group.id", clientId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.kafkaConsumer = new KafkaConsumer<>(props);
        this.kafkaConsumer.subscribe(Collections.singletonList(topic));

        // 初始化Netty MQTT客户端
        EventLoopGroup group = new NioEventLoopGroup();
        this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
        this.mqttClient.setCallback(new MqttConsumerCallback());
        this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
        this.mqttClient.subscribe(topic, 1).waitForCompletion();
    }

    public void consume() throws MqttException {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
        for (ConsumerRecord<String, String> record : records) {
            String payload = record.value();
            MqttMessage message = new MqttMessage(payload.getBytes());
            message.setQos(1);
            mqttClient.publish("topic", message).waitForCompletion();
        }
    }

    public void close() throws MqttException {
        kafkaConsumer.close();
        mqttClient.disconnect().waitForCompletion();
        mqttClient.close();
    }
}