nio框架中的多个Selector布局
当前位置:以往代写 > JAVA 教程 >nio框架中的多个Selector布局
2019-06-14

nio框架中的多个Selector布局

nio框架中的多个Selector布局

副标题#e#

跟着并发数量的提高,传统nio框架回收一个Selector来支撑大量毗连事件的 打点和触发已经碰着瓶颈,因此此刻各类nio框架的新版本都回收多个 Selector 并存的布局,由多个Selector平衡地去打点大量毗连。这里以Mina和Grizzly的实现为例。

在Mina 2.0中,Selector的打点是由 org.apache.mina.transport.socket.nio.NioProcessor来处理惩罚,每个 NioProcessor工具生存一个Selector,认真详细的select、wakeup、channel的 注册和打消、读写事件的注册和判定、实际的IO读写操纵等等,焦点代码如下:

public NioProcessor(Executor executor) {
         super(executor);
         try {
             // Open a new selector
             selector = Selector.open();
         } catch (IOException e) {
             throw new RuntimeIoException("Failed to  open a selector.", e);
         }
     }

     protected int select(long timeout) throws Exception  {
         return selector.select(timeout);
     }

     protected boolean isInterestedInRead(NioSession session)  {
         SelectionKey key = session.getSelectionKey ();
         return key.isValid() &&  (key.interestOps() & SelectionKey.OP_READ) != 0;
     }

     protected boolean isInterestedInWrite(NioSession  session) {
         SelectionKey key = session.getSelectionKey ();
         return key.isValid() &&  (key.interestOps() & SelectionKey.OP_WRITE) != 0;
     }
     protected int read(NioSession session, IoBuffer buf)  throws Exception {
         return session.getChannel().read(buf.buf());
     }

     protected int write(NioSession session, IoBuffer buf,  int length) throws Exception {
         if (buf.remaining() <= length) {
             return session.getChannel().write(buf.buf ());
         } else {
             int oldLimit = buf.limit();
             buf.limit(buf.position() + length);
             try {
                 return session.getChannel().write (buf.buf());
             } finally {
                 buf.limit(oldLimit);
             }
         }
     }


#p#副标题#e#

这些要领的挪用都是通过AbstractPollingIoProcessor来处理惩罚,这个类里可 以看到一个nio框架的焦点逻辑,注册、select、派发,详细因为与本文主题不 合,不再展开。NioProcessor的初始化是在NioSocketAcceptor的结构要领中调 用的:

public NioSocketAcceptor() {
         super(new DefaultSocketSessionConfig(),  NioProcessor.class);
         ((DefaultSocketSessionConfig) getSessionConfig ()).init(this);
     }

直接挪用了父类AbstractPollingIoAcceptor的结构函数,在个中我们可以看 到,默认是启动了一个SimpleIoProcessorPool来包装NioProcessor:

protected AbstractPollingIoAcceptor(IoSessionConfig  sessionConfig,
             Class<? extends  IoProcessor<T>> processorClass) {
         this(sessionConfig, null, new  SimpleIoProcessorPool<T>(processorClass),
                 true);
     }

这里其实是一个组合模式,SimpleIoProcessorPool和NioProcessor都实现了 Processor接口,一个是组合形成的Processor池,而另一个是单独的类。挪用的 SimpleIoProcessorPool的结构函数是这样:

private static final int DEFAULT_SIZE =  Runtime.getRuntime().availableProcessors() + 1;
     public SimpleIoProcessorPool(Class<? extends  IoProcessor<T>> processorType) {
         this(processorType, null, DEFAULT_SIZE);
     }

可以看到,默认的池巨细是cpu个数+1,也就是建设了cpu+1个的Selector对 象。它的重载结构函数里是建设了一个数组,启动一个 CachedThreadPool来运 行NioProcessor,通过反射建设详细的Processor工具,这里就不再列出了。

Mina当有一个新毗连成立的时候,就建设一个NioSocketSession,而且传入 上面的SimpleIoProcessorPool,当毗连初始化的时候将Session插手 SimpleIoProcessorPool:

#p#分页标题#e#

protected NioSession accept(IoProcessor<NioSession>  processor,
             ServerSocketChannel handle) throws  Exception {
         SelectionKey key = handle.keyFor (selector);

         if ((key == null) || (!key.isValid()) || (! key.isAcceptable()) ) {
             return null;
         }
         // accept the connection from the client
         SocketChannel ch = handle.accept();

         if (ch == null) {
             return null;
         }
         return new NioSocketSession(this, processor,  ch);
     }

         private void processHandles(Iterator<H>  handles) throws Exception {
             while (handles.hasNext()) {
                 H handle = handles.next();
                 handles.remove();
                 // Associates a new created  connection to a processor,
                 // and get back a session
                 T session = accept(processor,  handle);

                 if (session == null) {
                     break;
                 }
                 initSession(session, null,  null);
                 // add the session to the  SocketIoProcessor
                 session.getProcessor().add (session);
             }
         }

#p#副标题#e#

插手的操纵是递增一个整型变量而且模数组巨细后对应的NioProcessor注册 到session里:

private IoProcessor<T> nextProcessor() {
         checkDisposal();
         return pool[Math.abs (processorDistributor.getAndIncrement()) % pool.length];
     }
     if (p == null) {
             p = nextProcessor();
             IoProcessor<T> oldp =
                 (IoProcessor<T>)  session.setAttributeIfAbsent(PROCESSOR, p);
             if (oldp != null) {
                 p = oldp;
             }
     }

这样一来,每个毗连都关联一个NioProcessor,也就是关联一个Selector对 象,制止了所有毗连共用一个Selector负载过高导致 server响应变慢的效果。 可是留意到NioSocketAcceptor也有一个Selector,这个Selector用来干什么的 呢?那就是会合处理惩罚OP_ACCEPT事件的Selector,主要用于毗连的接入,不跟处 理读写事件的Selector混在一起,因此Mina的默认open的 Selector是cpu+2个。

看完mina2.0之后,我们来看看Grizzly2.0是怎么处理惩罚的,Grizzly照旧较量 守旧,它默认就是启动两个Selector,个中一个专门认真accept,另一个认真连 接的IO读写事件的打点。Grizzly 2.0中Selector的打点是通过SelectorRunner 类,这个类封装了Selector工具以及焦点的分发注册逻辑,你可以将他领略成 Mina中的NioProcessor,焦点的代码如下:

protected boolean doSelect() {
         selectorHandler =  transport.getSelectorHandler();
         selectionKeyHandler =  transport.getSelectionKeyHandler();
         strategy = transport.getStrategy();

         try {
             if (isResume) {
                 // If resume SelectorRunner -  finish postponed keys
                 isResume = false;
                 if (keyReadyOps != 0) {
                     if (!iterateKeyEvents())  return false;
                 }

                 if (!iterateKeys()) return  false;
             }
             lastSelectedKeysCount = 0;

             selectorHandler.preSelect(this);

             readyKeys = selectorHandler.select (this);
             if (stateHolder.getState(false) ==  State.STOPPING) return false;

             lastSelectedKeysCount = readyKeys.size ();

             if (lastSelectedKeysCount != 0) {
                 iterator = readyKeys.iterator ();
                 if (!iterateKeys()) return  false;
             }
             selectorHandler.postSelect(this);
         } catch (ClosedSelectorException e) {
             notifyConnectionException(key,
                     "Selector was unexpectedly closed", e,
                     Severity.TRANSPORT,  Level.SEVERE, Level.FINE);
         } catch (Exception e) {
             notifyConnectionException(key,
                     "doSelect exception", e,
                     Severity.UNKNOWN,  Level.SEVERE, Level.FINE);
         } catch (Throwable t) {
             logger.log(Level.SEVERE,"doSelect  exception", t);
             transport.notifyException(Severity.FATAL,  t);
         }
         return true;
     }

#p#副标题#e#

#p#分页标题#e#

根基上是一个reactor实现的样子,在AbstractNIOTransport类维护了一个 SelectorRunner的数组,而Grizzly 用于建设tcp server的类TCPNIOTransport 正是担任于AbstractNIOTransport类,在它的start要领中挪用了 startSelectorRunners来建设并启动SelectorRunner数组:

private static final int DEFAULT_SELECTOR_RUNNERS_COUNT  = 2;
  @Override
   public void start() throws IOException {
   if (selectorRunnersCount <= 0) {
                 selectorRunnersCount =  DEFAULT_SELECTOR_RUNNERS_COUNT;
             }
   startSelectorRunners();
}
  protected void startSelectorRunners() throws IOException  {
         selectorRunners = new SelectorRunner [selectorRunnersCount];

         synchronized(selectorRunners) {
             for (int i = 0; i <  selectorRunnersCount; i++) {
                 SelectorRunner runner =
                         new  SelectorRunner(this, SelectorFactory.instance().create());
                 runner.start();
                 selectorRunners[i] = runner;
             }
         }
     }

可见Grizzly并没有回收一个单独的池工具来打点SelectorRunner,而是直接 回收数组打点,默认数组巨细是2。 SelectorRunner实现了Runnable接口,它的 start要领挪用了一个线程池来运行自身。适才我提到了说Grizzly的Accept 是 单唯一个Selector来打点的,那么是如何表示的呢?谜底在 RoundRobinConnectionDistributor类,这个类是用于派发注册事件到相应的 SelectorRunner上,它的派发方法是这样:

public Future<RegisterChannelResult>  registerChannelAsync(
             SelectableChannel channel, int  interestOps, Object attachment,
             CompletionHandler completionHandler)
             throws IOException {
         SelectorRunner runner = getSelectorRunner (interestOps);

         return transport.getSelectorHandler ().registerChannelAsync(
                 runner, channel, interestOps,  attachment, completionHandler);
     }

     private SelectorRunner getSelectorRunner(int  interestOps) {
         SelectorRunner[] runners =  getTransportSelectorRunners();
         int index;
         if (interestOps == SelectionKey.OP_ACCEPT ||  runners.length == 1) {
             index = 0;
         } else {
             index = (counter.incrementAndGet() %  (runners.length - 1)) + 1;
         }

         return runners[index];
     }

#p#分页标题#e#

getSelectorRunner这个要领道出了奥秘,假如是OP_ACCEPT,那么都利用数 组中的第一个SelectorRunner,假如不是,那么就通过取模运算的功效+1从后头 的SelectorRunner中取一个来注册。

阐明完mina2.0和grizzly2.0对Selector的打点后我们可以获得几个启示:

1、在处理惩罚大量毗连的环境下,多个Selector比单个Selector好

2、多个Selector的环境下,处理惩罚OP_READ和OP_WRITE的Selector要与处理惩罚 OP_ACCEPT的Selector疏散,也就是说处理惩罚接入应该要一个单独的Selector工具 来处理惩罚,制止IO读写事件影响接入速度。

3、Selector的数目问题,mina默认是cpu+2,而grizzly总共就2个,我更倾 向于mina的计策,可是我认为应该对cpu个数做一个判定,假如CPU个数高出8个 ,那么更多的Selector线程大概带来较量大的线程切换的开销,mina默认的计策 并非符合,幸好可以配置这个数值。

    关键字:

在线提交作业