server
package cn.itcast.netty.c3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
// 细分2:创建一个独立的 EventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// boss 和 worker
// 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 让消息传递给下一个handler
}
});
/*.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override // ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});*/
}
})
.bind(8080);
}
}
创建Bootstrap:创建了一个Bootstrap实例,用于启动客户端。 设置EventLoopGroup:通过.group(new NioEventLoopGroup())设置了一个NioEventLoopGroup,用于处理客户端的事件循环。 设置客户端的Channel类型:通过.channel(NioSocketChannel.class)选择了NIO作为客户端的通信模式,使用NioSocketChannel作为客户端的Channel实现。 添加Handler:通过.handler()方法设置了一个ChannelInitializer,用于配置新连接的Channel。在这个初始化器中,添加了一个StringEncoder,用于将字符串编码为ByteBuf发送给服务器。 连接到服务器:通过.connect(new InetSocketAddress("localhost", 8080))方法发起异步非阻塞连接到指定的服务器地址和端口。 处理连接结果:
使用sync方法同步处理结果:通过channelFuture.sync()方法阻塞当前线程,直到连接建立完成。然后获取到连接成功后的Channel对象,并向服务器发送一条消息。使用addListener方法异步处理结果:通过channelFuture.addListener()方法添加一个ChannelFutureListener,用于在连接建立完成后异步处理连接结果。在operationComplete方法中,获取到连接成功后的Channel对象,并向服务器发送一条消息。
这段代码展示了如何使用Netty的异步非阻塞方式连接到服务器,并展示了两种处理连接结果的方法。
client
package cn.itcast.netty.c3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
// 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
// 1. 连接到服务器
// 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
.connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后
// 2.1 使用 sync 方法同步处理结果
/*channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");*/
// 2.2 使用 addListener(回调对象) 方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在 nio 线程连接建立好之后,会调用 operationComplete
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.channel();
log.debug("{}", channel);
channel.writeAndFlush("hello, world");
}
});
}
}
创建EventLoopGroup:通过EventLoopGroup group = new DefaultEventLoopGroup();创建了一个独立的EventLoopGroup实例,用于处理事件循环。这个DefaultEventLoopGroup是Netty提供的一个实现,它是一个单线程的EventLoopGroup,用于处理任务的执行和事件循环。 创建ServerBootstrap:通过new ServerBootstrap()创建了一个ServerBootstrap实例,用于启动服务器。 设置EventLoopGroup:通过.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))设置了两个事件循环组,其中一个用于处理接受连接的Boss事件循环组,另一个用于处理连接的读写事件的Worker事件循环组。这里使用了默认的NioEventLoopGroup实现,并且指定了Boss事件循环组和Worker事件循环组的线程数量分别为1和2。 设置服务器的Channel类型:通过.channel(NioServerSocketChannel.class)选择了NIO作为服务器的通信模式,使用NioServerSocketChannel作为服务器的Channel实现。 添加Handler:通过.childHandler()方法设置了一个ChannelInitializer,用于配置新连接的Channel。在这个初始化器中,添加了一个ChannelInboundHandlerAdapter,用于处理接收到的消息。在channelRead方法中,打印接收到的消息并调用ctx.fireChannelRead(msg)将消息传递给下一个Handler。 绑定监听端口:通过.bind(8080)方法绑定了服务器监听的端口为8080,开始监听客户端连接。
这段代码演示了如何使用Netty创建一个简单的服务器端程序,并使用独立的EventLoopGroup来处理事件循环。
好文链接
发表评论