目录

需求分析

RabbitMQ 客户端设定

ConnectionFactory(连接工厂)

Connection(连接)

Channel(通道)

针对 客户端 和 服务器 单元测试

需求分析

RabbitMQ 客户端设定

一个客户端可以有多个模块每个模块均可以和 Broker Server 之间建立 "逻辑上的连接"(channel)这几个模块的 channel 彼此之间是相互不影响的同时这几个 channel 复用了同一个 TCP 连接此处我们将仿照 RabbitMQ 客户端设定

ConnectionFactory(连接工厂)

这个类持有服务器的地址该类用于创建 Connection 对象

具体代码编写:

import lombok.Getter;

import lombok.Setter;

import java.io.IOException;

@Getter

@Setter

public class ConnectionFactory {

// broker server 的 ip 地址

private String host;

// broker server 的端口号

private int port;

public Connection newConnection() throws IOException {

Connection connection = new Connection(host,port);

return connection;

}

// 访问 broker server 的哪个虚拟主机

// 下列几个属性暂时先不搞了

// private String virtualHostName;

// private String username;

// private String password;

}

Connection(连接)

这个类表示一个 TCP 连接,持有 Socket 对象该类用于写入请求/读取响应,管理多个 Channel 对象

具体代码编写:

编写成员变量

private Socket socket = null;

// 需要管理多个 channel 使用一个 hash 表把若干个 channel 组织起来

private ConcurrentHashMap channelMap = new ConcurrentHashMap<>();

private InputStream inputStream;

private OutputStream outputStream;

private DataOutputStream dataOutputStream;

private DataInputStream dataInputStream;

//用来处理 0xc 的回调,这里开销可能会很大,不希望把 扫描线程 阻塞住,因此使用 线程池 来处理

private ExecutorService callbackPool = null;

编写构造方法此处不仅需要初始化成员变量,还需创建一个扫描线程,不停的从 socket 中读取响应数据,并将读取到的响应交给 dispatchResponse 方法执行

public Connection(String host, int port) throws IOException {

socket = new Socket(host,port);

inputStream = socket.getInputStream();

outputStream = socket.getOutputStream();

dataInputStream = new DataInputStream(inputStream);

dataOutputStream = new DataOutputStream(outputStream);

callbackPool = Executors.newFixedThreadPool(4);

// 创建一个 扫描线程,由这个线程负责不停的从 socket 中取响应数据 把这个响应数据再交给对应的 channel 负责处理

Thread t = new Thread(() -> {

try {

while (!socket.isClosed()) {

Response response = readResponse();

dispatchResponse(response);

}

}catch (SocketException e) {

// 连接正常断开,此时这个异常直接忽略

System.out.println("[Connection] 连接正常断开!");

}catch (IOException | ClassNotFoundException | MqException e) {

System.out.println("[Connection] 连接异常断开!");

e.printStackTrace();

}

});

t.start();

}

编写 dispatchResponse 方法使用该方法来区分,当前响应是一个针对控制请求的响应,还是服务器推送过来的消息如果是服务器推送过来的消息,type = 0xc,也就需要执行回调,通过线程池来执行如果只是一个普通的响应,就将该结果放到 channel 的 哈希表中随后 channel 的 putReturns 方法会唤醒所有阻塞等待的线程,让这些线程从 哈希表中拿与自己 rid 相等的返回结果

// 使用这个方法来分别处理,当前的响应是一个针对控制请求的响应,还是服务器推送的消息

private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {

if(response.getType() == 0xc) {

// 服务器推送来的消息数据

SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());

// 根据 channelId 找到对应的 channel 对象

Channel channel = channelMap.get(subScribeReturns.getChannelId());

if(channel == null) {

throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());

}

// 执行该 channel 对象内部的回调

// 此处我们直接将回调方法交给线程池来执行,而不是用扫描线程来执行

callbackPool.submit(() -> {

try {

channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(),subScribeReturns.getBasicProperties(),

subScribeReturns.getBody());

} catch (MqException | IOException e) {

e.printStackTrace();

}

});

}else {

// 当前响应是针对刚才控制请求的响应

BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());

// 把这个结果放到对应的 channel 的 hash 表中

Channel channel = channelMap.get(basicReturns.getChannelId());

if(channel == null) {

throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在!channelId = " + channel.getChannelId());

}

channel.putReturns(basicReturns);

}

}

编写 发送请求 与 读取响应 的方法

// 发送请求

public void writeRequest(Request request) throws IOException {

dataOutputStream.writeInt(request.getType());

dataOutputStream.writeInt(request.getLength());

dataOutputStream.write(request.getPayload());

dataOutputStream.flush();

System.out.println("[Connection] 发送请求! type = " + request.getType() + ",length = " + request.getLength());

}

// 读取响应

public Response readResponse() throws IOException {

Response response = new Response();

response.setType(dataInputStream.readInt());

response.setLength(dataInputStream.readInt());

byte[] payload = new byte[response.getLength()];

int n = dataInputStream.read(payload);

if(n != payload.length) {

throw new IOException("读取的响应数据不完整!");

}

response.setPayload(payload);

System.out.println("[Connection] 收到响应! type = " + response.getType() + ",length = " + response.getLength());

return response;

}

编写创建 channel 的方法

注意:

我们的代码中使用了多次 UUID message 的 id,就是用 UUID 当时加了个 M- 前缀现在 channel 的 id 也是使用 UUID 此时加个 C- 前缀rid 也使用 UUID 来生成,加个前缀  R-

// 通过这个方法,在 Connection 中能够创建出一个 Channel

public Channel createChannel() throws IOException, InterruptedException {

String channelId = "C-" + UUID.randomUUID().toString();

Channel channel = new Channel(channelId,this);

// 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中

channelMap.put(channelId,channel);

// 同时也需要把 创建 channel 的这个消息也告诉服务器

boolean ok = channel.createChannel();

if(!ok) {

// 服务器这里创建失败了!!整个这次创建 channel 操作不顺利!

// 把刚才已经加入 hash 表的键值对,再删了

channelMap.remove(channelId);

return null;

}

return channel;

}

编写释放 Connection 相关资源的方法

public void close() {

// 关闭 Connection 释放上述资源

try {

callbackPool.shutdownNow();

channelMap.clear();

inputStream.close();

outputStream.close();

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

Channel(通道)

这个类表示一个逻辑上的连接 该类用于提供一系列的方法,去和服务器提供的核心 API 相对应 客户端提供的这些方法,其方法内部就是发送了一个特定的请求

具体代码编写:

编写成员变量  与 构造方法

private String channelId;

// 当前这个 channel 属于哪个连接的

private Connection connection;

// 用来存储后续客户端收到的服务器的响应

private ConcurrentHashMap basicReturnsMap = new ConcurrentHashMap<>();

// 如果当前 Channel 订阅了某个队列,就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候,调用回调

// 此处约定一个 Channel 中只能有一个回调

private Consumer consumer = null;

public Channel(String channelId,Connection connection) {

this.channelId = channelId;

this.connection = connection;

}

实现 type = 0x1,即创建 channel构造请求发给服务器,随后阻塞等待,唤醒后从 basicReturnsMap 中尝试获取响应结果其余 type (0xc 除外,因为 0xc 只有响应没有请求)类型的请求与 0x1 大差不差,对着所需参数,构造即可

// 在这个方法中,和服务器进行交互,告知服务器,此处客户端创建了新的 channel 了

public Boolean createChannel() throws IOException, InterruptedException {

// 对于创建 Channel 来说,

BasicArguments basicArguments = new BasicArguments();

basicArguments.setChannelId(channelId);

basicArguments.setRid(generateRid());

byte[] payload = BinaryTool.toBytes(basicArguments);

Request request = new Request();

request.setType(0x1);

request.setLength(payload.length);

request.setPayload(payload);

// 构造出完整请求之后,就可以发送这个请求了

connection.writeRequest(request);

// 等待服务器的响应

BasicReturns basicReturns = waitResult(basicArguments.getRid());

return basicReturns.isOk();

}

private String generateRid() {

return "R-" + UUID.randomUUID().toString();

}

// 期望使用这个方法来阻塞等待服务器的响应

private BasicReturns waitResult(String rid) throws InterruptedException {

BasicReturns basicReturns = null;

while ((basicReturns = basicReturnsMap.get(rid)) == null) {

// 如果查询结果为 null,说明响应还没回来

// 此时就需要阻塞等待

synchronized (this) {

wait();

}

}

// 读取成功之后,还需要把这个消息从 哈希表中给删除掉

basicReturnsMap.remove(rid);

System.out.println("[Channel] 获取到服务器响应!rid = " + rid);

return basicReturns;

}

public void putReturns(BasicReturns basicReturns) {

basicReturnsMap.put(basicReturns.getRid(),basicReturns);

synchronized (this) {

// 当前也不知道有多少个线程在等待上述的这个响应

// 把所有的等待的线程都唤醒

notifyAll();

}

}

特别注意 type = 0xa ,即 订阅消息

其次值得注意的是 consumerTag 使用 channelId 来表示

// 订阅消息

public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException, InterruptedException {

// 先设置回调

if(this.consumer != null) {

throw new MqException("该 channel 已经设置过消费消息的回调了,不能重复设置!");

}

this.consumer = consumer;

BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();

basicConsumeArguments.setRid(generateRid());

basicConsumeArguments.setChannelId(channelId);

// 此处 ConsumerTag 也使用 channelId 来表示

basicConsumeArguments.setConsumerTag(channelId);

basicConsumeArguments.setQueueName(queueName);

basicConsumeArguments.setAutoAck(autoAck);

byte[] payload = BinaryTool.toBytes(basicConsumeArguments);

Request request = new Request();

request.setType(0xa);

request.setLength(payload.length);

request.setPayload(payload);

connection.writeRequest(request);

BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());

return basicReturns.isOk();

}

针对 客户端 和 服务器 单元测试

编写测试用例代码是十分重要的!

package com.example.demo;

import com.example.demo.common.Consumer;

import com.example.demo.common.MqException;

import com.example.demo.mqclient.Channel;

import com.example.demo.mqclient.Connection;

import com.example.demo.mqclient.ConnectionFactory;

import com.example.demo.mqserver.BrokerServer;

import com.example.demo.mqserver.core.BasicProperties;

import com.example.demo.mqserver.core.ExchangeType;

import org.apache.tomcat.util.http.fileupload.FileUtils;

import org.junit.jupiter.api.AfterEach;

import org.junit.jupiter.api.Assertions;

import org.junit.jupiter.api.BeforeEach;

import org.junit.jupiter.api.Test;

import org.springframework.boot.SpringApplication;

import java.io.File;

import java.io.IOException;

public class MqClientTests {

private BrokerServer brokerServer = null;

private ConnectionFactory connectionFactory = null;

private Thread t = null;

@BeforeEach

public void setUp() throws IOException {

// 1、先启动服务器

DemoApplication.context = SpringApplication.run(DemoApplication.class);

brokerServer = new BrokerServer(9090);

t = new Thread(() -> {

// 这个 start 方法会进入一个死循环,使用一个新的线程来运行 start 即可!

try {

brokerServer.start();

} catch (IOException e) {

e.printStackTrace();

}

});

t.start();

// 2、配置 ConnectionFactory

connectionFactory = new ConnectionFactory();

connectionFactory.setHost("127.0.0.1");

connectionFactory.setPort(9090);

}

@AfterEach

public void tearDown() throws IOException {

// 停止服务器

brokerServer.stop();

// t.join();

DemoApplication.context.close();

// 删除必要的文件

File file = new File("./data");

FileUtils.deleteDirectory(file);

connectionFactory = null;

}

@Test

public void testConnection() throws IOException {

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

}

@Test

public void testChannel() throws IOException, InterruptedException {

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

Channel channel = connection.createChannel();

Assertions.assertNotNull(channel);

}

@Test

public void testExchange() throws IOException, InterruptedException {

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

Channel channel = connection.createChannel();

Assertions.assertNotNull(channel);

boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);

Assertions.assertTrue(ok);

ok = channel.exchangeDelete("testExchange");

Assertions.assertTrue(ok);

// 此处稳妥起见,把该关闭的要进行关闭

channel.close();

connection.close();

}

@Test

public void testQueue() throws IOException, InterruptedException{

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

Channel channel = connection.createChannel();

Assertions.assertNotNull(channel);

boolean ok = channel.queueDeclare("testQueue",true,false,false,null);

Assertions.assertTrue(ok);

ok = channel.queueDelete("testQueue");

Assertions.assertTrue(ok);

channel.close();

connection.close();

}

@Test

public void testBinding() throws IOException, InterruptedException{

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

Channel channel = connection.createChannel();

Assertions.assertNotNull(channel);

boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);

Assertions.assertTrue(ok);

ok = channel.queueDeclare("testQueue",true,false,false,null);

Assertions.assertTrue(ok);

ok = channel.queueBind("testQueue","testExchange","testBindingKey");

Assertions.assertTrue(ok);

ok = channel.queueUnbind("testQueue","testExchange");

Assertions.assertTrue(ok);

channel.close();

connection.close();

}

@Test

public void testMessage() throws IOException, InterruptedException, MqException {

Connection connection = connectionFactory.newConnection();

Assertions.assertNotNull(connection);

Channel channel = connection.createChannel();

Assertions.assertNotNull(channel);

boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);

Assertions.assertTrue(ok);

ok = channel.queueDeclare("testQueue",true,false,false,null);

Assertions.assertTrue(ok);

byte[] requestBody = "hello".getBytes();

ok = channel.basicPublish("testExchange","testQueue",null,requestBody);

Assertions.assertTrue(ok);

channel.basicConsume("testQueue", true, new Consumer() {

@Override

public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {

System.out.println("[消费数据] 开始!");

System.out.println("consumeTag = " + consumerTag);

System.out.println("basicProperties = " + basicProperties);

Assertions.assertArrayEquals(requestBody,body);

System.out.println("[消费数据] 结束!");

}

});

Assertions.assertTrue(ok);

Thread.sleep(500);

channel.close();

connection.close();

}

}

相关链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: