一.基本概念
1.channel:channel是tcp链接的抽象,进行一些读写操作。
2.buffer:其实就是一块内存区域,channel从buffer中读数据,或者往里面写数据。
3.selector:selector的作用是一个线程来操作多个channel,在运用时需要将channel注册到selector中。
4.Bootstrap:它是Netty 提供的一个便利的工厂类, 我们可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化.
5.EventLoopGroup:是一个线程组。
6.EventLoop:可以理解成一个抽象的线程。
7.channelpipeline:是一个处理链,这里面体现的是责任链模式。在这个链里面我们可以把我们的处理逻辑加进去从而实现我们的业务逻辑。
8.unsafe:是channel中底层Soceket操作封装成的一个对象。
二.流程分析
1.客户端:
这里分析的是项目中的示例中的EchoClient。
客户端代码如下:
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } }}
首先分析初始化操作:
1.NioSocketChannel初始化:
初始化需要一个channelfactory,其中客户端代码中b.group().channel()就返回了一个niochannelfactory代码如下:
b.group(group) .channel(NioSocketChannel.class)public B channel(Class channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory(channelClass)); }@SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory channelFactory) { return channelFactory((ChannelFactory ) channelFactory); }//最终在AbstractBootstrap类中设置了一个channelfactory@Deprecated public B channelFactory(ChannelFactory channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
该接口中只有一个方法
@Deprecatedpublic interface ChannelFactory{ /** * Creates a new channel. */ T newChannel();}
这个方法就是生成channel的方法,下一步找到调用处。
ChannelFuture f = b.connect(HOST, PORT).sync();
这段代码会生成一个channel,调用链如下:
public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); }public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); }private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {//此处生成了channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
final ChannelFuture regFuture = initAndRegister();final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
通过newChannel方法生成一个niochannel,所以会调用NioSocketChannel的默认构造器。流程如下:
//第一步:private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); }public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); }private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } }public NioSocketChannel(SocketChannel socket) { this(null, socket); }public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }//第二步:调用父类protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }//第三步:再调用父类 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }//第四步:接着调用父类protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
这里一个channel的初始化就已经全部完成。
总结:
通过NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 产生一个新的Java Nio SocketChannel。
AbstractChannel:
parent属性为空
unsafe通过newUnsafe() 实例化一个 unsafe 对象
pipeline是通过new DefaultChannelPipeline(this)创建的实例,这里也告诉了我们channel与channelpipeline是一一对应的关系。
AbstractNioChannel:
SelectableChannel 是我们产生的实例
readInterestOp是SelectionKey.OP_READ
SelectableChannel被设置成非阻塞的。
NioSocketChannel:
SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
unsafe初始化:
@Overrideprotected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe();}
pipeline初始化:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}
pipeline其实是一个双向链表,这里面开始有一个头和尾,你自己的逻辑handler就是放入到这个链表里面,然后进行处理的。需要注意的是,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。
2.EventLoop初始化:
EventLoop初始化最终调用的是
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
初始化时:
EventLoopGroup group = new NioEventLoopGroup();
当没有传初始化个数时,且io.netty.eventLoopThreads属性值没有设置,则取处理器核心数*2。
这里的逻辑是:首先创建一个大小为nThreads 的数组,再调用newCild方法来初始化这个数组。其实netty里面的group就是基于这个的,这里面维护了一个EventExecutor数组,当netty需要一个EventLoop时就调用next()方法获取一个EventLoop。(其实EventLoopGroup可以理解为MultithreadEventExecutorGroup)。在EventLoop中第一次执行execute方法时,会调用startThread方法,接着调用doStartThread方法,这个方法会把当前线程赋值给SingleThreadEventExecutor里面的thread属性,这样每个eventloop就有了一个跟它绑定的线程。并且调用下方法
SingleThreadEventExecutor.this.run();
时期一直处在运行状态中,处理任务。
3.channel注册过程:
调用链如下:
initAndRegister();ChannelFuture regFuture = config().group().register(channel);@Override public ChannelFuture register(Channel channel) { return next().register(channel); }@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
在unsafe类中:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //将获得到的eventloop赋值给channel中的eventloop值 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
最终调用的是AbstractNioChannel.doRegister这个方法,把channel与eventLoop中的selector关联起来。这里也做了另外一件事就是将当前 Channel 作为 attachment。
注册过程:AbstractBootstrap.initAndRegister->group().register(channel)->MultithreadEventLoopGroup.register->SingleThreadEventLoop.register->channel.unsafe().register(this, promise)-> AbstractUnsafe.register->register0->AbstractNioChannel.doRegister->javaChannel().register(eventLoop().selector, 0, this)。
注册结束后调用
pipeline.fireChannelRegistered();
从而把我们自己定义的业务handler加载到pipeline中。
链接成功后调用
pipeline().fireChannelActive();
4.handler添加过程:
Bootstrap.handler方法就是实现了handler的添加功能。
调用方法链为:
register
register0
pipeline.fireChannelRegistered();
AbstractChannelHandlerContext.invokeChannelRegistered(head);
next.invokeChannelRegistered();
((ChannelInboundHandler) handler()).channelRegistered(this);
initChannel(ctx)
initChannel((C) ctx.channel());
到了这里就会调用我们重写的handler加载逻辑,最后再调用
remove(ctx);
remove(ctx)调用是因为initAndRegister()方法中调用了init(channel)方法,从而把我们重写的ChannelInitializer加载到了pipeline中。
netty的事件传播机制:例子
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); }}
当调用了channelActive时,如果希望继续传播下去,需要调用ctx.fireChannelActive()。