这篇文章主要介绍“Netty服务端启动源码是什么”,在日常操作中,相信很多人在Netty服务端启动源码是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Netty服务端启动源码是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
创新互联建站从2013年成立,先为石河子等服务建站,石河子等地企业,进行企业商务咨询服务。为石河子企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
示例从哪里来?任何开源框架都会有自己的示例代码,Netty源码也不例外,如模块netty-example
中就包括了最常见的EchoServer
示例,下面通过这个示例进入服务端启动流程篇章。
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // 1. 声明Main-Sub Reactor模式线程池:EventLoopGroup // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建 EchoServerHandler 对象 final EchoServerHandler serverHandler = new EchoServerHandler(); try { // 2. 声明服务端启动引导器,并设置相关属性 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // 3. 绑定端口即启动服务端,并同步等待 // Start the server. ChannelFuture f = b.bind(PORT).sync(); // 4. 监听服务端关闭,并阻塞等待 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // 5. 优雅地关闭两个EventLoopGroup线程池 // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
[代码行18、19]声明Main-Sub Reactor
模式线程池:EventLoopGroup
创建两个 EventLoopGroup
对象。其中,bossGroup
用于服务端接受客户端的连接,workerGroup
用于进行客户端的 SocketChannel
的数据读写。
(关于EventLoopGroup
不是本文重点所以在后续文章中进行分析)
[代码行23-39]声明服务端启动引导器,并设置相关属性
AbstractBootstrap
是一个帮助类,通过方法链(method chaining
)的方式,提供了一个简单易用的方式来配置启动一个Channel
。io.netty.bootstrap.ServerBootstrap
,实现 AbstractBootstrap
抽象类,用于 Server
的启动器实现类。io.netty.bootstrap.Bootstrap
,实现 AbstractBootstrap
抽象类,用于 Client
的启动器实现类。如下类图所示:

(在EchoServer
示例代码中,我们看到 ServerBootstrap
的 group
、channel
、option
、childHandler
等属性链式设置都放到关于AbstractBootstrap
体系代码中详细介绍。)
[代码行43]绑定端口即启动服务端,并同步等待
先调用 #bind(int port)
方法,绑定端口,后调用 ChannelFuture#sync()
方法,阻塞等待成功。对于bind
操作就是本文要详细介绍的"服务端启动流程"。
[代码行47]监听服务端关闭,并阻塞等待
先调用 #closeFuture()
方法,监听服务器关闭,后调用 ChannelFuture#sync()
方法,阻塞等待成功。 注意,此处不是关闭服务器,而是channel
的监听关闭。
[代码行51、52]优雅地关闭两个EventLoopGroup
线程池
finally
代码块中执行说明服务端将最终关闭,所以调用 EventLoopGroup#shutdownGracefully()
方法,分别关闭两个EventLoopGroup
对象,终止所有线程。
在服务启动过程的源码分析之前,这里回顾一下我们在通过JDK NIO
编程在服务端启动初始的代码:
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
这5行代码标示一个最为熟悉的过程:
打开serverSocketChannel
配置非阻塞模式
为channel
的socket
绑定监听端口
创建Selector
将serverSocketChannel
注册到 selector
后面等分析完Netty
的启动过程后,会对这些步骤有一个新的认识。在EchoServer
示例中,进入 #bind(int port)
方法,AbstractBootstrap#bind()
其实有多个方法,方便不同地址参数的传递,实际调用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress)
方法,代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
[代码行2] :调用 #initAndRegister()
方法,初始化并注册一个 Channel
对象。因为注册是异步的过程,所以返回一个 ChannelFuture
对象。详细解析,见 「initAndRegister()
」。
[代码行4-6]]:若发生异常,直接进行返回。
[代码行9-34]:因为注册是异步的过程,有可能已完成,有可能未完成。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】分别处理已完成和未完成的情况。
核心在[第 11 、29行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
方法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey
中。
如果异步注册对应的 ChanelFuture
未完成,则调用 ChannelFuture#addListener(ChannelFutureListener)
方法,添加监听器,在注册完成后,进行回调执行 #doBind0(...)
方法的逻辑。
通过doBind
方法可以知道服务端启动流程大致如下几个步骤:
从#doBind(final SocketAddress localAddress)
进入到initAndRegister()
:
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
[代码行4]调用 ChannelFactory#newChannel()
方法,创建Channel
对象。 ChannelFactory
类继承如下:
可以在ChannelFactory
注释看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,这里只是包名的调整,对于继承结构不变。netty
默认使用ReflectiveChannelFactory
,我们可以看到重载方法:
@Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }
很明显,正如其名是通过反射机制构造Channel
对象实例的。constructor
是在其构造方法初始化的:this.constructor = clazz.getConstructor();
这个clazz
按理说应该是我们要创建的Channel
的Class对象。那Class
对象是什么呢?我们接着看channelFactory
是怎么初始化的。
首先在AbstractBootstrap
找到如下代码:
@Deprecated public B channelFactory(ChannelFactory channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
调用这个方法的递推向上看到:
public B channel(Class channelClass) { return channelFactory(new ReflectiveChannelFactory( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
这个方法正是在EchoServer
中ServerBootstrap
链式设置时调用.channel(NioServerSocketChannel.class)
的方法。我们看到,channelClass
就是NioServerSocketChannel.class
,channelFactory
也是以ReflectiveChannelFactory
作为具体实例,并且将NioServerSocketChannel.class
作为构造参数传递初始化的,所以这回答了反射机制构造的是io.netty.channel.socket.nio.NioServerSocketChannel
对象。
继续看NioServerSocketChannel
构造方法逻辑做了什么事情,看之前先给出NioServerSocketChannel
类继承关系:
NioServerSocketChannel
与NioSocketChannel
分别对应服务端和客户端,公共父类都是AbstractNioChannel
和AbstractChannel
,下面介绍创建过程可以参照这个Channel
类继承图。进入NioServerSocketChannel
构造方法:
/** * Create a new instance */ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
点击newSocket
进去:
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See #2308. */ return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } }
以上传进来的provider
是DEFAULT_SELECTOR_PROVIDER
即默认的java.nio.channels.spi.SelectorProvider
,[代码行9]就是熟悉的jdk nio
创建ServerSocketChannel
。这样newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回了结果ServerSocketChannel
,回到NioServerSocketChannel()#this()
点进去:
/** * Create a new instance using the given {@link ServerSocketChannel}. */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
以上super
代表父类AbstractNioMessageChannel
构造方法,点进去看到:
/** * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) */ protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }
以上super
代表父类AbstractNioChannel
构造方法,点进去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
以上[代码行3]将ServerSocketChannel
保存到了AbstractNioChannel#ch
成员变量,在上面提到的NioServerSocketChannel
构造方法的[代码行6]javaChannel()
拿到的就是ch
保存的ServerSocketChannel
变量。
以上[代码行6]就是熟悉的jdk nio
编程设置ServerSocketChannel
非阻塞方式。这里还有super
父类构造方法,点击进去看到:
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
以上构造方法中:
parent
属性,代表父 Channel
对象。对于NioServerSocketChannel
的 parent
为null
。
id
属性,Channel
编号对象。在构造方法中,通过调用 #newId()
方法进行创建。(这里不细展开Problem-1)
unsafe
属性,Unsafe
对象。因为Channel
真正的具体操作,是通过调用对应的 Unsafe
对象实施。所以需要在构造方法中,通过调用 #newUnsafe()
方法进行创建。这里的 Unsafe
并不是我们常说的 jdk
自带的sun.misc.Unsafe
,而是 io.netty.channel.Channel#Unsafe
。(这里不细展开Problem-2)
pipeline
属性默认是DefaultChannelPipeline
对象,赋值后在后面为channel绑定端口的时候会用到
通过以上创建channel
源码过程分析,总结的流程时序图如下:
回到一开始创建Channel
的initAndRegister()
入口方法,在创建Channel
后紧接着init(channel)
进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel)
,代码如下:
@Override void init(Channel channel) throws Exception { final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map , Object> attrs = attrs0(); synchronized (attrs) { for (Entry , Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey