Main.java 1.8 KB
package four;

import org.eclipse.paho.client.mqttv3.MqttException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws MqttException {
        String brokerUrl = "tcp://localhost:9092";
        String topic = "topic";

        List<MqttProducerClient> producers = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            String clientId = "producer-" + i;
            MqttProducerClient client = new MqttProducerClient(clientId, brokerUrl, topic);
            producers.add(client);
        }

        List<MqttConsumerClient> consumers = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            String clientId = "consumer-" + i;
            MqttConsumerClient client = new MqttConsumerClient(clientId, brokerUrl, topic);
            consumers.add(client);
        }

        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            for (MqttProducerClient client : producers) {
                try {
                    String payload = "{\"temperature\":" + Math.random() * 100 + ",\"humidity\":" + Math.random() * 100 + ",\"wind_speed\":" + Math.random() * 100 + "}";
                    client.publish(payload);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 1, TimeUnit.SECONDS);

        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            for (MqttConsumerClient client : consumers) {
                try {
                    client.consume();
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
}