server

package cn.itcast.netty.c3;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j

public class EventLoopServer {

public static void main(String[] args) {

// 细分2:创建一个独立的 EventLoopGroup

EventLoopGroup group = new DefaultEventLoopGroup();

new ServerBootstrap()

// boss 和 worker

// 细分1:boss 只负责 ServerSocketChannel 上 accept 事件 worker 只负责 socketChannel 上的读写

.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {

@Override // ByteBuf

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

log.debug(buf.toString(Charset.defaultCharset()));

ctx.fireChannelRead(msg); // 让消息传递给下一个handler

}

});

/*.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {

@Override // ByteBuf

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

ByteBuf buf = (ByteBuf) msg;

log.debug(buf.toString(Charset.defaultCharset()));

}

});*/

}

})

.bind(8080);

}

}

创建Bootstrap:创建了一个Bootstrap实例,用于启动客户端。 设置EventLoopGroup:通过.group(new NioEventLoopGroup())设置了一个NioEventLoopGroup,用于处理客户端的事件循环。 设置客户端的Channel类型:通过.channel(NioSocketChannel.class)选择了NIO作为客户端的通信模式,使用NioSocketChannel作为客户端的Channel实现。 添加Handler:通过.handler()方法设置了一个ChannelInitializer,用于配置新连接的Channel。在这个初始化器中,添加了一个StringEncoder,用于将字符串编码为ByteBuf发送给服务器。 连接到服务器:通过.connect(new InetSocketAddress("localhost", 8080))方法发起异步非阻塞连接到指定的服务器地址和端口。 处理连接结果:

使用sync方法同步处理结果:通过channelFuture.sync()方法阻塞当前线程,直到连接建立完成。然后获取到连接成功后的Channel对象,并向服务器发送一条消息。使用addListener方法异步处理结果:通过channelFuture.addListener()方法添加一个ChannelFutureListener,用于在连接建立完成后异步处理连接结果。在operationComplete方法中,获取到连接成功后的Channel对象,并向服务器发送一条消息。

这段代码展示了如何使用Netty的异步非阻塞方式连接到服务器,并展示了两种处理连接结果的方法。

client

package cn.itcast.netty.c3;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.Channel;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.string.StringEncoder;

import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j

public class EventLoopClient {

public static void main(String[] args) throws InterruptedException {

// 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果

ChannelFuture channelFuture = new Bootstrap()

.group(new NioEventLoopGroup())

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override // 在连接建立后被调用

protected void initChannel(NioSocketChannel ch) throws Exception {

ch.pipeline().addLast(new StringEncoder());

}

})

// 1. 连接到服务器

// 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程

.connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后

// 2.1 使用 sync 方法同步处理结果

/*channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕

Channel channel = channelFuture.channel();

log.debug("{}", channel);

channel.writeAndFlush("hello, world");*/

// 2.2 使用 addListener(回调对象) 方法异步处理结果

channelFuture.addListener(new ChannelFutureListener() {

@Override

// 在 nio 线程连接建立好之后,会调用 operationComplete

public void operationComplete(ChannelFuture future) throws Exception {

Channel channel = future.channel();

log.debug("{}", channel);

channel.writeAndFlush("hello, world");

}

});

}

}

创建EventLoopGroup:通过EventLoopGroup group = new DefaultEventLoopGroup();创建了一个独立的EventLoopGroup实例,用于处理事件循环。这个DefaultEventLoopGroup是Netty提供的一个实现,它是一个单线程的EventLoopGroup,用于处理任务的执行和事件循环。 创建ServerBootstrap:通过new ServerBootstrap()创建了一个ServerBootstrap实例,用于启动服务器。 设置EventLoopGroup:通过.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))设置了两个事件循环组,其中一个用于处理接受连接的Boss事件循环组,另一个用于处理连接的读写事件的Worker事件循环组。这里使用了默认的NioEventLoopGroup实现,并且指定了Boss事件循环组和Worker事件循环组的线程数量分别为1和2。 设置服务器的Channel类型:通过.channel(NioServerSocketChannel.class)选择了NIO作为服务器的通信模式,使用NioServerSocketChannel作为服务器的Channel实现。 添加Handler:通过.childHandler()方法设置了一个ChannelInitializer,用于配置新连接的Channel。在这个初始化器中,添加了一个ChannelInboundHandlerAdapter,用于处理接收到的消息。在channelRead方法中,打印接收到的消息并调用ctx.fireChannelRead(msg)将消息传递给下一个Handler。 绑定监听端口:通过.bind(8080)方法绑定了服务器监听的端口为8080,开始监听客户端连接。

这段代码演示了如何使用Netty创建一个简单的服务器端程序,并使用独立的EventLoopGroup来处理事件循环。

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: