MqttProducerCallback.java 1005 字节
package four;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class MqttProducerCallback implements MqttCallback {
    private final KafkaProducer<String, String> kafkaProducer;
    private final String topic;

    public MqttProducerCallback(KafkaProducer<String, String> kafkaProducer, String topic) {
        this.kafkaProducer = kafkaProducer;
        this.topic = topic;
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        kafkaProducer.send(new ProducerRecord<>(this.topic, message.toString()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        // do nothing
    }

    @Override
    public void connectionLost(Throwable cause) {
        // do nothing
    }
}