Main.java
1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package four;
import org.eclipse.paho.client.mqttv3.MqttException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws MqttException {
String brokerUrl = "tcp://localhost:9092";
String topic = "topic";
List<MqttProducerClient> producers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String clientId = "producer-" + i;
MqttProducerClient client = new MqttProducerClient(clientId, brokerUrl, topic);
producers.add(client);
}
List<MqttConsumerClient> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String clientId = "consumer-" + i;
MqttConsumerClient client = new MqttConsumerClient(clientId, brokerUrl, topic);
consumers.add(client);
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (MqttProducerClient client : producers) {
try {
String payload = "{\"temperature\":" + Math.random() * 100 + ",\"humidity\":" + Math.random() * 100 + ",\"wind_speed\":" + Math.random() * 100 + "}";
client.publish(payload);
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (MqttConsumerClient client : consumers) {
try {
client.consume();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1, TimeUnit.SECONDS);
}
}