作者 邓耀骏

feat(two_c):实现二更改为C实现

<component name="ArtifactManager">
<artifact type="jar" name="client:jar">
<output-path>$PROJECT_DIR$/out/artifacts/client_jar</output-path>
<root id="archive" name="Software_Architecture_Experiment_onetwoFour.jar">
<element id="module-output" name="Software_Architecture_Experiment_onetwoFour" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-dns/4.1.92.Final/netty-codec-dns-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-kqueue/4.1.92.Final/netty-transport-classes-kqueue-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-sctp/4.1.92.Final/netty-transport-sctp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-classes-macos/4.1.92.Final/netty-resolver-dns-classes-macos-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-native-macos/4.1.92.Final/netty-resolver-dns-native-macos-4.1.92.Final-osx-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-socks/4.1.92.Final/netty-codec-socks-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.2.5/org.eclipse.paho.client.mqttv3-1.2.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-http2/4.1.92.Final/netty-codec-http2-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.1.92.Final/netty-transport-native-epoll-4.1.92.Final-linux-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler-ssl-ocsp/4.1.92.Final/netty-handler-ssl-ocsp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport/4.1.92.Final/netty-transport-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.1.92.Final/netty-transport-native-unix-common-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-rxtx/4.1.92.Final/netty-transport-rxtx-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-udt/4.1.92.Final/netty-transport-udt-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.1.92.Final/netty-transport-native-kqueue-4.1.92.Final-osx-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.1.92.Final/netty-resolver-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-stomp/4.1.92.Final/netty-codec-stomp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-common/4.1.92.Final/netty-common-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-memcache/4.1.92.Final/netty-codec-memcache-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-native-macos/4.1.92.Final/netty-resolver-dns-native-macos-4.1.92.Final-osx-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.1.92.Final/netty-buffer-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-mqtt/4.1.92.Final/netty-codec-mqtt-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-redis/4.1.92.Final/netty-codec-redis-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler/4.1.92.Final/netty-handler-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.1.92.Final/netty-transport-native-kqueue-4.1.92.Final-osx-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec/4.1.92.Final/netty-codec-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-all/4.1.92.Final/netty-all-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns/4.1.92.Final/netty-resolver-dns-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-haproxy/4.1.92.Final/netty-codec-haproxy-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.1.92.Final/netty-transport-native-epoll-4.1.92.Final-linux-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-xml/4.1.92.Final/netty-codec-xml-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler-proxy/4.1.92.Final/netty-handler-proxy-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-smtp/4.1.92.Final/netty-codec-smtp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.1.92.Final/netty-transport-classes-epoll-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-http/4.1.92.Final/netty-codec-http-4.1.92.Final.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
... ...
<component name="ArtifactManager">
<artifact type="jar" name="server:jar">
<output-path>$PROJECT_DIR$/out/artifacts/server_jar</output-path>
<root id="archive" name="Software_Architecture_Experiment_onetwoFour.jar">
<element id="module-output" name="Software_Architecture_Experiment_onetwoFour" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-dns/4.1.92.Final/netty-codec-dns-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-kqueue/4.1.92.Final/netty-transport-classes-kqueue-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-sctp/4.1.92.Final/netty-transport-sctp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-classes-macos/4.1.92.Final/netty-resolver-dns-classes-macos-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-native-macos/4.1.92.Final/netty-resolver-dns-native-macos-4.1.92.Final-osx-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-socks/4.1.92.Final/netty-codec-socks-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.2.5/org.eclipse.paho.client.mqttv3-1.2.5.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-http2/4.1.92.Final/netty-codec-http2-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.1.92.Final/netty-transport-native-epoll-4.1.92.Final-linux-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/com/github/luben/zstd-jni/1.5.2-1/zstd-jni-1.5.2-1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler-ssl-ocsp/4.1.92.Final/netty-handler-ssl-ocsp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport/4.1.92.Final/netty-transport-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-unix-common/4.1.92.Final/netty-transport-native-unix-common-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-rxtx/4.1.92.Final/netty-transport-rxtx-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-udt/4.1.92.Final/netty-transport-udt-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.1.92.Final/netty-transport-native-kqueue-4.1.92.Final-osx-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver/4.1.92.Final/netty-resolver-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-stomp/4.1.92.Final/netty-codec-stomp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-common/4.1.92.Final/netty-common-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-memcache/4.1.92.Final/netty-codec-memcache-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/xerial/snappy/snappy-java/1.1.8.4/snappy-java-1.1.8.4.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns-native-macos/4.1.92.Final/netty-resolver-dns-native-macos-4.1.92.Final-osx-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-buffer/4.1.92.Final/netty-buffer-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-mqtt/4.1.92.Final/netty-codec-mqtt-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-redis/4.1.92.Final/netty-codec-redis-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler/4.1.92.Final/netty-handler-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-kqueue/4.1.92.Final/netty-transport-native-kqueue-4.1.92.Final-osx-x86_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec/4.1.92.Final/netty-codec-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-all/4.1.92.Final/netty-all-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-resolver-dns/4.1.92.Final/netty-resolver-dns-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-haproxy/4.1.92.Final/netty-codec-haproxy-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-native-epoll/4.1.92.Final/netty-transport-native-epoll-4.1.92.Final-linux-aarch_64.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-xml/4.1.92.Final/netty-codec-xml-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-handler-proxy/4.1.92.Final/netty-handler-proxy-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-smtp/4.1.92.Final/netty-codec-smtp-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-transport-classes-epoll/4.1.92.Final/netty-transport-classes-epoll-4.1.92.Final.jar" path-in-jar="/" />
<element id="extracted-dir" path="$MAVEN_REPOSITORY$/io/netty/netty-codec-http/4.1.92.Final/netty-codec-http-4.1.92.Final.jar" path-in-jar="/" />
</root>
</artifact>
</component>
\ No newline at end of file
... ...
... ... @@ -14,6 +14,18 @@
<version>4.1.92.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
... ...
package four;
import org.eclipse.paho.client.mqttv3.MqttException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws MqttException {
String brokerUrl = "tcp://localhost:9092";
String topic = "topic";
List<MqttProducerClient> producers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String clientId = "producer-" + i;
MqttProducerClient client = new MqttProducerClient(clientId, brokerUrl, topic);
producers.add(client);
}
List<MqttConsumerClient> consumers = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String clientId = "consumer-" + i;
MqttConsumerClient client = new MqttConsumerClient(clientId, brokerUrl, topic);
consumers.add(client);
}
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (MqttProducerClient client : producers) {
try {
String payload = "{\"temperature\":" + Math.random() * 100 + ",\"humidity\":" + Math.random() * 100 + ",\"wind_speed\":" + Math.random() * 100 + "}";
client.publish(payload);
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
for (MqttConsumerClient client : consumers) {
try {
client.consume();
} catch (MqttException e) {
e.printStackTrace();
}
}
}, 0, 1, TimeUnit.SECONDS);
}
}
\ No newline at end of file
... ...
package four;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttConsumerCallback implements MqttCallback {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Received message: " + message.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// do nothing
}
@Override
public void connectionLost(Throwable cause) {
// do nothing
}
}
... ...
package four;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MqttConsumerClient {
// Kafka消费者客户端
private final KafkaConsumer<String, String> kafkaConsumer;
// Netty MQTT客户端
private final MqttAsyncClient mqttClient;
public MqttConsumerClient(String clientId, String brokerUrl, String topic) throws MqttException {
// 初始化Kafka消费者客户端
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("group.id", clientId);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.kafkaConsumer = new KafkaConsumer<>(props);
this.kafkaConsumer.subscribe(Collections.singletonList(topic));
// 初始化Netty MQTT客户端
EventLoopGroup group = new NioEventLoopGroup();
this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
this.mqttClient.setCallback(new MqttConsumerCallback());
this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
this.mqttClient.subscribe(topic, 1).waitForCompletion();
}
public void consume() throws MqttException {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String payload = record.value();
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
mqttClient.publish("topic", message).waitForCompletion();
}
}
public void close() throws MqttException {
kafkaConsumer.close();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
}
}
... ...
package four;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttProducerCallback implements MqttCallback {
private final KafkaProducer<String, String> kafkaProducer;
private final String topic;
public MqttProducerCallback(KafkaProducer<String, String> kafkaProducer, String topic) {
this.kafkaProducer = kafkaProducer;
this.topic = topic;
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
kafkaProducer.send(new ProducerRecord<>(this.topic, message.toString()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// do nothing
}
@Override
public void connectionLost(Throwable cause) {
// do nothing
}
}
... ...
package four;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Properties;
public class MqttProducerClient {
// Kafka生产者客户端
private final KafkaProducer<String, String> kafkaProducer;
// Netty MQTT客户端
private final MqttAsyncClient mqttClient;
public MqttProducerClient(String clientId, String brokerUrl, String topic) throws MqttException {
// 初始化Kafka生产者客户端
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.kafkaProducer = new KafkaProducer<>(props);
// 初始化Netty MQTT客户端
EventLoopGroup group = new NioEventLoopGroup();
this.mqttClient = new MqttAsyncClient(brokerUrl, clientId, new MemoryPersistence());
this.mqttClient.setCallback(new MqttProducerCallback(kafkaProducer, topic));
this.mqttClient.connect(new MqttConnectOptions(), null).waitForCompletion();
this.mqttClient.subscribe(topic, 1).waitForCompletion();
}
public void publish(String payload) throws MqttException {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
mqttClient.publish("topic", message).waitForCompletion();
}
public void close() throws MqttException {
kafkaProducer.close();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
}
}
... ...
## Netty
Netty是一个基于Java的异步事件驱动的网络应用程序框架。它提供了一种高效、可扩展和可靠的网络编程解决方案,可以帮助开发者快速构建高性能、可靠的网络应用程序。
Netty的优点包括:
1. 高性能:Netty采用NIO(非阻塞I/O)模型,可以实现高并发、高吞吐量的网络通信,同时减少了线程的创建和销毁,提高了系统的性能。
2. 可扩展性:Netty提供了一套灵活的事件处理机制,可以方便地扩展和定制网络应用程序,同时支持多种协议和编解码器。
3. 可靠性:Netty支持TCP协议的可靠传输,可以自动重传丢失的数据包,保证了数据的可靠性和完整性。
4. 易用性:Netty提供了简洁且易于使用的API,同时支持多种编程模型,包括阻塞、非阻塞和事件驱动等,可以满足不同场景下的需求。
5. 安全性:Netty提供了一套完整的SSL/TLS支持,可以保证网络通信的安全性。
总之,Netty是一个功能强大、易于使用、可靠性高的网络编程框架,可以帮助开发者快速构建高性能、可靠的网络应用程序。
## Kafka
Kafka是一个分布式的、可扩展的、高吞吐量的消息队列系统。它最初由LinkedIn公司开发,现在已经成为Apache软件基金会的顶级项目之一。
Kafka的核心设计思想是将消息的生产者和消费者解耦,通过消息队列来实现异步通信。Kafka的消息以topic为单位进行分类和存储,生产者将消息发送到特定的topic,消费者从topic中读取消息。Kafka支持多个消费者同时消费同一个topic,同时支持消息的持久化存储和高可靠性的数据复制机制。
Kafka的优点包括:
1. 高吞吐量:Kafka采用基于磁盘的存储方式,可以支持海量数据的存储和高吞吐量的数据读写。
2. 可扩展性:Kafka的分布式架构可以方便地扩展集群规模,同时支持水平扩展和垂直扩展。
3. 可靠性:Kafka采用多副本机制,可以保证消息的可靠性和数据的一致性。
4. 灵活性:Kafka提供了灵活的消息处理机制,支持多种消息格式和编解码器。
5. 易用性:Kafka提供了简单易用的API和管理工具,可以方便地进行配置和管理。
总之,Kafka是一个可靠、高吞吐量的消息队列系统,可以帮助开发者构建分布式、可扩展的应用程序。Kafka广泛应用于大数据处理、实时数据流处理、日志收集和数据同步等场景。
## MQTT
MQTT(Message Queue Telemetry Transport)协议是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网(IoT)和嵌入式系统中的设备间通信。MQTT协议是IBM公司开发的,现在已经成为OASIS标准。
MQTT协议的核心设计思想是将数据发布到一个主题(Topic)上,然后订阅该主题的设备可以接收到这些数据。MQTT协议支持多种消息传输质量,包括最多一次、至少一次和恰好一次,可以根据应用场景选择不同的传输质量。
MQTT协议的优点包括:
1. 轻量级:MQTT协议的数据包很小,传输效率高,适合在带宽有限的网络中使用。
2. 灵活性:MQTT协议支持多种消息传输质量,可以根据应用场景选择不同的传输质量。
3. 可靠性:MQTT协议支持消息的持久化存储和重发机制,可以保证消息的可靠性。
4. 易用性:MQTT协议提供了简单易用的API和客户端库,可以方便地进行开发和集成。
5. 支持广泛:MQTT协议已经成为物联网和嵌入式系统中的通信标准,得到了广泛的应用和支持。
总之,MQTT协议是一种轻量级、灵活、可靠的通信协议,适用于物联网和嵌入式系统中的设备间通信。MQTT协议已经成为物联网领域的重要标准之一,得到了广泛的应用和支持。
\ No newline at end of file
... ...
## 服务端使用
![image-20230507110716217](https://pic.lxtlovely.top/blog/202305071119289.png)
1. `cd /home/dyj`
2. `gcc server.c -o server` 编译为可执行文件 server
3. `gcc client.c -o client` 编译为可执行文件 client
4. `./server 8887` 开启服务端并设置端口为 :8877
## 客户端
![image-20230507111913598](https://pic.lxtlovely.top/blog/202305071143981.png)
1. `cd /home/dyj`
2. `./client 111.230.239.199 8887 f1` ./app <IP地址> <端口号> <名称>
![image-20230507111935188](https://pic.lxtlovely.top/blog/202305071143800.png)
1. `cd /home/dyj`
2. `./client 111.230.239.199 8887 f2`
... ...
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/time.h>
#include <poll.h>
#include <sys/epoll.h>
//消息结构体
struct MSG_DATA
{
char type; //消息类型. 0表示有聊天的消息数据 1表示好友上线 2表示好友下线
char name[50]; //好友名称
int number; //在线人数的数量
unsigned char buff[100]; //发送的聊天数据消息
};
struct MSG_DATA msg_data;
#define MAX_EVENTS 2
struct epoll_event ev, events[MAX_EVENTS];
int epollfd;
int nfds;
//文件接收端
int main(int argc,char **argv)
{
if(argc!=4)
{
printf("./app <IP地址> <端口号> <名称>\n");
return 0;
}
int sockfd;
//忽略 SIGPIPE 信号--方式服务器向无效的套接字写数据导致进程退出
signal(SIGPIPE,SIG_IGN);
/*1. 创建socket套接字*/
sockfd=socket(AF_INET,SOCK_STREAM,0);
/*2. 连接服务器*/
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(atoi(argv[2])); // 端口号0~65535
addr.sin_addr.s_addr=inet_addr(argv[1]); //IP地址
if(connect(sockfd,(const struct sockaddr *)&addr,sizeof(struct sockaddr_in))!=0)
{
printf("客户端:服务器连接失败.\n");
return 0;
}
/*3. 发送消息表示上线*/
msg_data.type=1;
strcpy(msg_data.name,argv[3]);
write(sockfd,&msg_data,sizeof(struct MSG_DATA));
int cnt;
int i;
//创建专用文件描述符
epollfd = epoll_create(10);
//添加要监听的文件描述符
ev.events = EPOLLIN;
ev.data.fd = sockfd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev);
ev.events = EPOLLIN;
ev.data.fd = 0; //标准输入文件描述符
epoll_ctl(epollfd, EPOLL_CTL_ADD, 0, &ev);
while(1)
{
//监听事件
nfds=epoll_wait(epollfd,events,MAX_EVENTS,-1);
if(nfds)
{
for(i=0;i<nfds;i++)
{
if(events[i].data.fd==sockfd) //判断收到服务器的消息
{
cnt=read(sockfd,&msg_data,sizeof(struct MSG_DATA));
if(cnt<=0) //判断服务器是否断开了连接
{
printf("服务器已经退出.\n");
goto SERVER_ERROR;
}
else if(cnt>0)
{
if(msg_data.type==0)
{
printf("%s:%s 在线人数:%d\n",msg_data.name,msg_data.buff,msg_data.number);
}
else if(msg_data.type==1)
{
printf("%s 好友上线. 在线人数:%d\n",msg_data.name,msg_data.number);
}
else if(msg_data.type==2)
{
printf("%s 好友下线. 在线人数:%d\n",msg_data.name,msg_data.number);
}
}
}
else if(events[i].data.fd==0) //表示键盘上有数据输入
{
gets(msg_data.buff); //读取键盘上的消息
msg_data.type=0; //表示正常消息
strcpy(msg_data.name,argv[3]); //名称
write(sockfd,&msg_data,sizeof(struct MSG_DATA));
}
}
}
}
SERVER_ERROR:
close(sockfd);
return 0;
}
\ No newline at end of file
... ...
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/epoll.h>
int sockfd;
//消息结构体
struct MSG_DATA
{
char type; //消息类型. 0表示有聊天的消息数据 1表示好友上线 2表示好友下线
char name[50]; //好友名称
int number; //在线人数的数量
unsigned char buff[100]; //发送的聊天数据消息
};
//存放当前服务器连接的客户端套接字
struct CLIENT_FD
{
int fd;
char name[50]; //名称
struct CLIENT_FD *next;
};
//定义链表头
struct CLIENT_FD *list_head=NULL;
struct CLIENT_FD *List_CreateHead(struct CLIENT_FD *list_head);
void List_AddNode(struct CLIENT_FD *list_head,int fd);
void List_DelNode(struct CLIENT_FD *list_head,int fd);
int List_GetNodeCnt(struct CLIENT_FD *list_head);
void Server_SendMsgData(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd);
void List_SaveName(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd);
void List_GetName(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd);
#define MAX_EPOLL_FD 100
struct epoll_event events[MAX_EPOLL_FD];
struct epoll_event event;
int epfd;
int nfd;
struct MSG_DATA msg_data;
/*信号工作函数*/
void signal_work_func(int sig)
{
close(sockfd);
exit(0); //结束进程
}
int main(int argc,char **argv)
{
if(argc!=2)
{
printf("./app <端口号>\n");
return 0;
}
signal(SIGPIPE,SIG_IGN); //忽略 SIGPIPE 信号--防止服务器异常退出
signal(SIGINT,signal_work_func);
//创建链表头
list_head=List_CreateHead(list_head);
/*1. 创建socket套接字*/
sockfd=socket(AF_INET,SOCK_STREAM,0);
//设置端口号的复用功能
int on = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
/*2. 绑定端口号与IP地址*/
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(atoi(argv[1])); // 端口号0~65535
addr.sin_addr.s_addr=INADDR_ANY; //inet_addr("0.0.0.0"); //IP地址
if(bind(sockfd,(const struct sockaddr *)&addr,sizeof(struct sockaddr))!=0)
{
printf("服务器:端口号绑定失败.\n");
}
/*3. 设置监听的数量*/
listen(sockfd,20);
/*4. 等待客户端连接*/
int client_fd;
struct sockaddr_in client_addr;
socklen_t addrlen;
int i;
int cnt;
/*5. 创建epoll相关的接口*/
epfd=epoll_create(MAX_EPOLL_FD);
event.events=EPOLLIN; //监听的事件
event.data.fd=sockfd; //监听的套接字
epoll_ctl(epfd,EPOLL_CTL_ADD,sockfd,&event);
while(1)
{
//等待事件发生
nfd=epoll_wait(epfd,events,MAX_EPOLL_FD,-1);
for(i=0;i<nfd;i++)
{
if(events[i].data.fd==sockfd) //表示有新的客户端连接上服务器
{
client_fd=accept(sockfd,(struct sockaddr *)&client_addr,&addrlen);
printf("连接的客户端IP地址:%s\n",inet_ntoa(client_addr.sin_addr));
printf("连接的客户端端口号:%d\n",ntohs(client_addr.sin_port));
//保存已经连接上来的客户端
List_AddNode(list_head,client_fd);
//将新连接的客户端套接字添加到epoll函数监听队列里
event.data.fd=client_fd; //监听的套接字
epoll_ctl(epfd,EPOLL_CTL_ADD,client_fd,&event);
}
else //表示客户端给服务器发送了消息-----实现消息的转发
{
//读取客户端发送的消息
cnt=read(events[i].data.fd,&msg_data,sizeof(struct MSG_DATA));
if(cnt<=0) //表示当前客户端断开了连接
{
//获取名称
List_GetName(list_head,&msg_data,events[i].data.fd);
//删除节点
List_DelNode(list_head,events[i].data.fd);
msg_data.type=2;
//将断开连接的客户端套接字从epoll函数监听队列里删除调用
event.data.fd=events[i].data.fd; //监听的套接字
epoll_ctl(epfd,EPOLL_CTL_DEL,events[i].data.fd,&event);
close(event.data.fd);
}
if(msg_data.type==1) //好友上线的时候保存一次名称
{
//保存名称
List_SaveName(list_head,&msg_data,events[i].data.fd);
}
//转发消息给其他好友
msg_data.number=List_GetNodeCnt(list_head); //当前在线好友人数
Server_SendMsgData(list_head,&msg_data,events[i].data.fd);
}
}
}
//退出进程
signal_work_func(0);
return 0;
}
/*
函数功能: 创建链表头
*/
struct CLIENT_FD *List_CreateHead(struct CLIENT_FD *list_head)
{
if(list_head==NULL)
{
list_head=malloc(sizeof(struct CLIENT_FD));
list_head->next=NULL;
}
return list_head;
}
/*
函数功能: 添加节点
*/
void List_AddNode(struct CLIENT_FD *list_head,int fd)
{
struct CLIENT_FD *p=list_head;
struct CLIENT_FD *new_p;
while(p->next!=NULL)
{
p=p->next;
}
new_p=malloc(sizeof(struct CLIENT_FD));
new_p->next=NULL;
new_p->fd=fd;
p->next=new_p;
}
/*
函数功能: 删除节点
*/
void List_DelNode(struct CLIENT_FD *list_head,int fd)
{
struct CLIENT_FD *p=list_head;
struct CLIENT_FD *tmp;
while(p->next!=NULL)
{
tmp=p;
p=p->next;
if(p->fd==fd) //找到了要删除的节点
{
tmp->next=p->next;
free(p);
break;
}
}
}
/*
函数功能: 获取当前链表中有多少个节点
*/
int List_GetNodeCnt(struct CLIENT_FD *list_head)
{
int cnt=0;
struct CLIENT_FD *p=list_head;
while(p->next!=NULL)
{
p=p->next;
cnt++;
}
return cnt;
}
/*
函数功能: 转发消息
*/
void Server_SendMsgData(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd)
{
struct CLIENT_FD *p=list_head;
while(p->next!=NULL)
{
p=p->next;
if(p->fd!=client_fd)
{
write(p->fd,msg_data,sizeof(struct MSG_DATA));
}
}
}
/*
函数功能: 保存好友的名称
*/
void List_SaveName(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd)
{
struct CLIENT_FD *p=list_head;
while(p->next!=NULL)
{
p=p->next;
if(p->fd==client_fd) //找到在链表里的当前套接字
{
strcpy(p->name,msg_data->name);
}
}
}
/*
函数功能: 获取好友的名称
*/
void List_GetName(struct CLIENT_FD *list_head,struct MSG_DATA *msg_data,int client_fd)
{
struct CLIENT_FD *p=list_head;
while(p->next!=NULL)
{
p=p->next;
if(p->fd==client_fd) //找到在链表里的当前套接字
{
strcpy(msg_data->name,p->name);
}
}
}
\ No newline at end of file
... ...
... ... @@ -9,26 +9,29 @@ import java.util.Scanner;
public class Client {
private static final int BUFFER_SIZE = 1024;
private static final String HOST = "localhost";
private static final int PORT = 8888;
private static final int PORT = 8877;
public static void main(String[] args) {
try {
// 创建SocketChannel对象,连接服务器。
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress(HOST, PORT));
while (!clientChannel.finishConnect()) {
// wait until connection is established
// 如果连接未完成,等待连接完成。
}
System.out.println("Connected to server " + HOST + ":" + PORT);
System.out.println("Enter your name:");
// 读取用户输入的用户名,将其发送给服务器。
Scanner scanner = new Scanner(System.in);
String name = scanner.nextLine();
ByteBuffer buffer = ByteBuffer.wrap(name.getBytes());
clientChannel.write(buffer);
// 创建一个新线程,使用SocketChannel的read()方法读取服务器发送的消息,并在控制台上显示。
new Thread(() -> {
while (true) {
try {
... ... @@ -44,6 +47,7 @@ public class Client {
}
}).start();
// 在主线程中,读取用户输入的消息,使用SocketChannel的write()方法将其发送给服务器。
while (true) {
String message = scanner.nextLine();
buffer = ByteBuffer.wrap(message.getBytes());
... ...
... ... @@ -13,7 +13,7 @@ import java.util.Map;
public class Server {
private static final int BUFFER_SIZE = 1024;
private static final int PORT = 8888;
private static final int PORT = 8877;
private Selector selector;
private Map<SocketChannel, String> clientMap = new HashMap<>();
... ... @@ -25,6 +25,7 @@ public class Server {
public void startServer() {
try {
// 创建Selector对象和ServerSocketChannel对象,并将ServerSocketChannel注册到Selector上,监听连接事件。
selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
... ... @@ -32,16 +33,21 @@ public class Server {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server started on port " + PORT);
// 进入主循环
while (true) {
// 调用Selector的select()方法等待事件发生
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
// 如果有事件发生,使用迭代器遍历SelectionKey集合,处理每个事件。
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 如果是连接事件,使用ServerSocketChannel的accept()方法接受客户端连接,
// 并将客户端的SocketChannel注册到Selector上,监听读事件。
if (key.isAcceptable()) {
SocketChannel clientChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
... ... @@ -50,6 +56,7 @@ public class Server {
clientMap.put(clientChannel, "");
}
// 如果是读事件,使用SocketChannel的read()方法读取客户端发送的消息,并将其转发给其他客户端。
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
... ... @@ -67,6 +74,7 @@ public class Server {
}
}
// 处理完事件后,使用迭代器的remove()方法将SelectionKey从集合中删除。
keyIterator.remove();
}
}
... ... @@ -79,7 +87,6 @@ public class Server {
for (SocketChannel clientChannel : clientMap.keySet()) {
if (clientChannel != senderChannel) {
System.out.println(clientChannel);
//多个对象发送,每次都得载一个新的对象,不然就只能发送一次
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
clientChannel.write(buffer);
... ...