MqttProducerClient.java 1.9 KB
package four;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.Properties;

public class MqttProducerClient {
    // Kafka生产者客户端
    private final KafkaProducer<String, String> kafkaProducer;
    // Netty MQTT客户端
    private final MqttAsyncClient mqttClient;

    public MqttProducerClient(String clientId, String brokerUrl, String topic) throws MqttException {
        // 初始化Kafka生产者客户端
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerUrl);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.kafkaProducer = new KafkaProducer<>(props);

        // 初始化Netty MQTT客户端
        EventLoopGroup group = new NioEventLoopGroup();
        this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
        this.mqttClient.setCallback(new MqttProducerCallback(kafkaProducer, topic));
        this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
        this.mqttClient.subscribe(topic, 1).waitForCompletion();
    }

    public void publish(String payload) throws MqttException {
        MqttMessage message = new MqttMessage(payload.getBytes());
        message.setQos(1);
        mqttClient.publish("topic", message).waitForCompletion();
    }

    public void close() throws MqttException {
        kafkaProducer.close();
        mqttClient.disconnect().waitForCompletion();
        mqttClient.close();
    }
}