Netty 学习笔记 001

 

Channel

Socket

ChannelHandler

ChannelHandler 的典型用途包括:

  • 将数据从一种格式转换为另一种格式。
  • 提供异常的通知。
  • 提供 Channel 变为活动的或非活动的通知。
  • 提供当 Channel 注册到 EventLoop 或者从 EventLoop 注销时的通知
  • 提供有关用户自定义的通知

Channel 的方法:

  • eventLoop : 返回分配给 Channel 的 EventLoop
  • pipeline : 返回分配给 Channel 的 ChannelPipeline
  • isActive : 如果 Channel是活动的返回 true. 活动的意义依赖于底层的船速.
  • localAddress : 返回本地的 SocketAddress
  • remoteAddress : 返回远程的 SocketAddress
  • write : 将数据写到远程节点.这个数据将被传递给 ChannelPipeline,并且排队直到它被冲刷.
  • flush : 将之前已写的数据冲刷到底层传输, 如一个 Socket
  • writeAndFlush : 等同于上面两个方法连续调用.

写出到 Channel :


Channel channel = ...;
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
ChannelFuture cf = channel.writeAndFlush(buf);
// 添加 ChannelFutureListener 在写完成后接收通知.
cf.addListener(new ChannelFutureListener() {
    @Override public void operationComplete(ChannelFuture future) {
        if (future.isSuccess()) { 
        // 写操作成功	
            System.out.println("OK");
        } elst {
            System.err.println("write error");
            future.cause().printStackTrace();
        }
    }
});

Netty 的 Channel 实现是线程安全的,因此可以存储一个 Channel 的引用,并且当需要向远程节点写数据时, 都可以使用它,即使当前许多线程都在使用它.

Netty 锁提供的传输

EventLoop

控制流、多线程处理、并发

ChannelFuture

异步通知

启动(服务端)

要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑,有了这三者,之后在调用 bind(8000),我们就可以在本地绑定一个 8000 端口启动起来。

package com.example.myscala002.demo1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class DemoNettyServer001 {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoNettyServer001.class);


    public void start() {

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {

            }
        });


        ChannelFuture channelFuture = serverBootstrap.bind(9002);

        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>(){

            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()) {
                    LOGGER.info("绑定端口成功");
                } else {
                    LOGGER.error("绑定端口失败");
                }
            }
        } );

    }
}


以上代码:

  • 创建了两个 NioEventLoopGroup, 这两个对象可以看做是传统 IO 编程模型的两大线程组

bossGroup 表示监听端口,accept 新连接的线程组,workerGroup 表示处理每一条连接的数据读写的线程组。

  • 创建了一个引导类 ServerBootstrap,这个类将引导我们进行服务端的启动工作。
  • 我们通过 .group(bossGroup, workerGroup) 给引导类配置两大线程组。
  • 然后,我们指定我们服务端的 IO 模型为 NIO,我们通过 .channel(NioServerSocketChannel.class) 来指定 IO 模型。
  • 接着,我们调用 childHandler() 方法,给这个引导类创建一个 ChannelInitializer,这里主要就是定义后续每条连接的数据读写,业务处理逻辑。ChannelInitializer这个类中。

NioSocketChannel,这个类呢,就是 Netty 对 NIO 类型的连接的抽象。

服务端启动其他方法

handler() 方法

handler() 方法,可以和我们前面分析的 childHandler() 方法对应起来,childHandler() 用于指定处理新连接数据的读写处理逻辑,handler() 用于指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法。

        serverBootstrap.handler(new ChannelInitializer() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                System.out.println("服务端启动中");
            }
        });

attr() 方法

serverBootstrap.attr(AttributeKey.newInstance("serverName"), "nettyServer")

attr() 方法可以给服务端的 channel,也就是 NioServerSocketChannel 指定一些自定义属性,然后我们可以通过 channel.attr() 取出这个属性,比如,上面的代码我们指定我们服务端 channel 的一个 serverName 属性,属性值为 nettyServe ,其实说白了就是给 NioServerSocketChannel 维护一个 map而已,通常情况下,我们也用不上这个方法。

除了可以给服务端 channel NioServerSocketChannel 指定一些自定义属性之外,我们还可以给每一条连接指定自定义属性。

childAttr() 方法

serverBootstrap.childAttr(AttributeKey.newInstance("clientKey"), "clientValue")

上面的 childAttr() 可以给每一条连接指定自定义属性,然后后续我们可以通过 channel.attr() 取出该属性。

childOption() 方法

serverBootstrap
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)

childOption() 可以给每条连接设置一些TCP底层相关的属性,比如上面,我们设置了两种 TCP 属性,其中

  • ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制,true 为开启
  • ChannelOption.TCP_NODELAY 表示是否开启Nagle算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启。

其他的参数这里就不一一讲解,有兴趣的同学可以去这个类里面自行研究。

option() 方法

除了给每个连接设置这一系列属性之外,我们还可以给服务端 channel 设置一些属性,最常见的就是so_backlog,如下设置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数

启动(客户端)

对于客户端的启动来说,和服务端的启动类似,依然需要线程模型、IO 模型,以及 IO 业务处理逻辑三大参数,下面,我们来看一下客户端启动的标准流程


package com.example.myscala002.demo1;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class DemoNettyClient001 {


    private static final Logger LOGGER = LoggerFactory.getLogger(DemoNettyClient001.class);


    private static final int MAX_RETRY = 5;

    private static void connect(final Bootstrap bootstrap, final String host, final int port, final int retry) {
        
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                LOGGER.info("连接成功!");
            } else if (retry == 0) {
                LOGGER.warn("重试次数已用完,放弃连接!");
            } else {
                // 第几次重连
                int order = (MAX_RETRY - retry) + 1;
                // 本次重连的间隔
                int delay = 1 << order;
                LOGGER.warn("{} : 连接失败,第 {} 次重连……", new Date(), order);
                bootstrap.config()
                        .group()
                        .schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
    }
    public void start() {

        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();

        // 1: 指定线程模型
        bootstrap.group(workerGroup);

        // 2: 指定 IO 类型为 NIO
        bootstrap.channel(NioSocketChannel.class);

        // 3: IO 处理逻辑
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {

            }
        });

        ChannelFuture future = bootstrap.connect("40.73.4.33", 80);

        // 4:建立连接
        future.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()) {
                    LOGGER.info("连接成功");
                } else {
                    LOGGER.info("连接失败");
                    LOGGER.info("future ==> {}", future);
                }
            }
        });
    }

    public static void main(String[] args) {
        new DemoNettyClient001().start();
    }
}

其他方法:

attr() 方法

bootstrap.attr(AttributeKey.newInstance("clientName"), "nettyClient")

option() 方法


Bootstrap
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .option(ChannelOption.TCP_NODELAY, true)

option() 方法可以给连接设置一些 TCP 底层相关的属性,比如上面,我们设置了三种 TCP 属性,其中

  • ChannelOption.CONNECT_TIMEOUT_MILLIS 表示连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
  • ChannelOption.SO_KEEPALIVE 表示是否开启 TCP 底层心跳机制,true 为开启
  • ChannelOption.TCP_NODELAY 表示是否开始 Nagle 算法,true 表示关闭,false 表示开启,通俗地说,如果要求高实时性,有数据发送时就马上发送,就设置为 true 关闭,如果需要减少发送次数减少网络交互,就设置为 false 开启

« EOF »

If you like TeXt, don’t forget to give me a star :star2:.