MqttProducerCallback.java
1005 字节
package four;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttProducerCallback implements MqttCallback {
private final KafkaProducer<String, String> kafkaProducer;
private final String topic;
public MqttProducerCallback(KafkaProducer<String, String> kafkaProducer, String topic) {
this.kafkaProducer = kafkaProducer;
this.topic = topic;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
kafkaProducer.send(new ProducerRecord<>(this.topic, message.toString()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// do nothing
}
@Override
public void connectionLost(Throwable cause) {
// do nothing
}
}