Channel
Socket
ChannelHandler
ChannelHandler 的典型用途包括:
- 将数据从一种格式转换为另一种格式。
- 提供异常的通知。
- 提供 Channel 变为活动的或非活动的通知。
- 提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知
- 提供有关用户自定义的通知
Channel 的方法:
-
eventLoop
: 返回分配给 Channel 的 EventLoop -
pipeline
: 返回分配给 Channel 的 ChannelPipeline -
isActive
: 如果 Channel是活动的返回 true. 活动的意义依赖于底层的船速. -
localAddress
: 返回本地的 SocketAddress -
remoteAddress
: 返回远程的 SocketAddress -
write
: 将数据写到远程节点.这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷. -
flush
: 将之前已写的数据冲刷到底层传输, 如一个 Socket -
writeAndFlush
: 等同于上面两个方法连续调用.
写出到 Channel :
Channel channel = ...;
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
ChannelFuture cf = channel.writeAndFlush(buf);
// 添加 ChannelFutureListener 在写完成后接收通知.
cf.addListener(new ChannelFutureListener() {
@Override public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// 写操作成功
System.out.println("OK");
} elst {
System.err.println("write error");
future.cause().printStackTrace();
}
}
});
Netty 的 Channel 实现是线程安全的,因此可以存储一个 Channel 的引用,并且当需要向远程节点写数据时, 都可以使用它,即使当前许多线程都在使用它.
Netty 锁提供的传输
EventLoop
控制流、多线程处理、并发
ChannelFuture
异步通知
启动(服务端)
要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑,有了这三者,之后在调用 bind(8000)
,我们就可以在本地绑定一个 8000 端口启动起来。
package com.example.myscala002.demo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class DemoNettyServer001 {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoNettyServer001.class);
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
}
});
ChannelFuture channelFuture = serverBootstrap.bind(9002);
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>(){
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("绑定端口成功");
} else {
LOGGER.error("绑定端口失败");
}
}
} );
}
}
以上代码:
- 创建了两个 NioEventLoopGroup, 这两个对象可以看做是传统 IO 编程模型的两大线程组。
bossGroup 表示监听端口,accept 新连接的线程组,workerGroup 表示处理每一条连接的数据读写的线程组。
- 创建了一个引导类 ServerBootstrap,这个类将引导我们进行服务端的启动工作。
- 我们通过
.group(bossGroup, workerGroup)
给引导类配置两大线程组。 - 然后,我们指定我们服务端的 IO 模型为 NIO,我们通过
.channel(NioServerSocketChannel.class)
来指定 IO 模型。 - 接着,我们调用
childHandler()
方法,给这个引导类创建一个 ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。ChannelInitializer这个类中。
NioSocketChannel
,这个类呢,就是 Netty 对 NIO 类型的连接的抽象。
服务端启动其他方法
handler()
方法
handler()
方法,可以和我们前面分析的 childHandler()
方法对应起来,childHandler()
用于指定处理新连接数据的读写处理逻辑,handler()
用于指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法。
serverBootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
System.out.println("服务端启动中");
}
});
attr()
方法
serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")
attr()
方法可以给服务端的 channel
,也就是 NioServerSocketChannel
指定一些自定义属性,然后我们可以通过 channel.attr()
取出这个属性,比如,上面的代码我们指定我们服务端 channel
的一个 serverName 属性,属性值为 nettyServe ,其实说白了就是给 NioServerSocketChannel
维护一个 map而已,通常情况下,我们也用不上这个方法。
除了可以给服务端 channel NioServerSocketChannel 指定一些自定义属性之外,我们还可以给每一条连接指定自定义属性。
childAttr()
方法
serverBootstrap.childAttr(AttributeKey.newInstance("clientKey"), "clientValue")
上面的 childAttr()
可以给每一条连接指定自定义属性,然后后续我们可以通过 channel.attr()
取出该属性。
childOption()
方法
serverBootstrap
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
childOption()
可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种 TCP 属性,其中
-
ChannelOption.SO_KEEPALIVE
表示是否开启 TCP 底层心跳机制,true 为开启 -
ChannelOption.TCP_NODELAY
表示是否开启Nagle算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。
其他的参数这里就不一一讲解,有兴趣的同学可以去这个类里面自行研究。
option()
方法
除了给每个连接设置这一系列属性之外,我们还可以给服务端 channel 设置一些属性,最常见的就是so_backlog
,如下设置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)
表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
启动(客户端)
对于客户端的启动来说,和服务端的启动类似,依然需要线程模型、IO 模型,以及 IO 业务处理逻辑三大参数,下面,我们来看一下客户端启动的标准流程
package com.example.myscala002.demo1;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DemoNettyClient001 {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoNettyClient001.class);
private static final int MAX_RETRY = 5;
private static void connect(final Bootstrap bootstrap, final String host, final int port, final int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
LOGGER.info("连接成功!");
} else if (retry == 0) {
LOGGER.warn("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
LOGGER.warn("{} : 连接失败,第 {} 次重连……", new Date(), order);
bootstrap.config()
.group()
.schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
});
}
public void start() {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
// 1: 指定线程模型
bootstrap.group(workerGroup);
// 2: 指定 IO 类型为 NIO
bootstrap.channel(NioSocketChannel.class);
// 3: IO 处理逻辑
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
}
});
ChannelFuture future = bootstrap.connect("40.73.4.33", 80);
// 4:建立连接
future.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("连接成功");
} else {
LOGGER.info("连接失败");
LOGGER.info("future ==> {}", future);
}
}
});
}
public static void main(String[] args) {
new DemoNettyClient001().start();
}
}
其他方法:
attr()
方法
bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")
option()
方法
Bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
option()
方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中
-
ChannelOption.CONNECT_TIMEOUT_MILLIS
表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败 -
ChannelOption.SO_KEEPALIVE
表示是否开启 TCP 底层心跳机制,true 为开启 -
ChannelOption.TCP_NODELAY
表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启
« EOF »
If you like TeXt, don’t forget to give me a star .