盘一盘 NIO (三)—— Selector解析
Selector是个啥?
继承关系图
public abstract class Selector implements Closeable { // 构造方法 protected Selector() { } // 打开选择器 public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } // 判断此选择器是否已打开。 public abstract boolean isOpen(); // 返回创建Channel的选择器生产者 public abstract SelectorProvider provider(); // 返回选择器的key set public abstract Set<SelectionKey> keys(); // 返回此选择器的selected-key集。 public abstract Set<SelectionKey> selectedKeys(); // 对选择的一组键所对应的通道准备进行IO操作。 public abstract int selectNow() throws IOException; public abstract int select(long timeout) throws IOException; public abstract int select() throws IOException; // 立即返回第一个尚未完成的选择器操作。 public abstract Selector wakeup(); // 关闭选择器 public abstract void close() throws IOException; }
SelectableChannel是个啥?
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel { // 构造方法 protected SelectableChannel() { } // 返回创建Channel的选择器生产者 public abstract SelectorProvider provider(); // 返回有效操作集 public abstract int validOps(); // 判断此Channel是否在Selector中注册 public abstract boolean isRegistered(); // 返回Channel在Selector中的注册的选择键 public abstract SelectionKey keyFor(Selector sel); public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { return register(sel, ops, null); } // 调整此通道为阻塞模式。 public abstract SelectableChannel configureBlocking(boolean block) throws IOException; // 调整此通道是否阻塞 public abstract boolean isBlocking(); // 返还阻塞模式锁定的对象 public abstract Object blockingLock(); }
SelectionKey是个啥?
继承关系图
抽象类方法
public abstract class SelectionKey { // 构造方法 protected SelectionKey() { } // 返回当前选择键对应的SelectableChannel public abstract SelectableChannel channel(); // 返回当前选择键对应的Selector public abstract Selector selector(); // 判断当前选择键是否有效。 public abstract boolean isValid(); // 取消特定的注册关系。 public abstract void cancel(); // selector中感兴趣的集合 public abstract int interestOps(); public abstract SelectionKey interestOps(int ops); // 获取相关通道已经就绪的操作 public abstract int readyOps(); // SelectionKey中的四种操作类型:读、写、连接、接受。 public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4; public final boolean isReadable() { return (readyOps() & OP_READ) != 0; } public final boolean isWritable() { return (readyOps() & OP_WRITE) != 0; } public final boolean isConnectable() { return (readyOps() & OP_CONNECT) != 0; } public final boolean isAcceptable() { return (readyOps() & OP_ACCEPT) != 0; } // SelectionKey上的附加对象 private volatile Object attachment = null; private static final AtomicReferenceFieldUpdater<SelectionKey,Object> attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater( SelectionKey.class, Object.class, "attachment" ); // 将附加对象绑定到SelectionKey上,便于识别给定的通道 public final Object attach(Object ob) { return attachmentUpdater.getAndSet(this, ob); } // 取出绑定在SelectionKey上的附加对象 public final Object attachment() { return attachment; } }
关键方法解析
在了解了Selector,SelectableChannel,SelectionKey的概念后,我们进一步深入源码
open方法
打开选择器,创建Selector对象
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
public static SelectorProvider provider() { synchronized (lock) { // 判断provider是否已经产生,若已产生则直接返回 if (provider != null) return provider; // 若未产生,则需要调用AccessController的静态方法doPrivileged,创建一个新的Selector对象 return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
select方法
// 由其基类SelectorImpl中实现 public int select() throws IOException { return this.select(0L); } public int select(long var1) throws IOException { if (var1 < 0L) { throw new IllegalArgumentException("Negative timeout"); } else { // slect()无参时默认传入0,实则交给lockAndDoSelect方法去完成,并且令参数为-1 return this.lockAndDoSelect(var1 == 0L ? -1L : var1); } } private int lockAndDoSelect(long var1) throws IOException { synchronized(this) { // 先判断当前的Selector对象是否关闭 if (!this.isOpen()) { throw new ClosedSelectorException(); } else { // 分别以publicKeys以及publicSelectedKeys为锁,最终的实现交给抽象方法doSelect完成; int var10000; synchronized(this.publicKeys) { synchronized(this.publicSelectedKeys) { // doSelect方法由WindowsSelectorImpl实现 var10000 = this.doSelect(var1); } } return var10000; } } }
doSelect方法由WindowsSelectorImpl实现:
// channelArray是一个SelectionKeyImpl数组,SelectionKeyImpl负责记录Channel和SelectionKey状态 // channelArray是根据连接的Channel数量动态维持的,初始化大小是8。 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8]; protected int doSelect(long var1) throws IOException { if (this.channelArray == null) { throw new ClosedSelectorException(); } else { this.timeout = var1; // 调用processDeregisterQueue方法来取消准备撤销的集合 this.processDeregisterQueue(); if (this.interruptTriggered) { // 若是发生了中断,调用resetWakeupSocket方法恢复中断 this.resetWakeupSocket(); return 0; } else { // 未发生中断,调用adjustThreadsCount调整轮询线程数量 this.adjustThreadsCount(); this.finishLock.reset(); this.startLock.startThreads(); try { this.begin(); try { this.subSelector.poll(); } catch (IOException var7) { this.finishLock.setException(var7); } if (this.threads.size() > 0) { this.finishLock.waitForHelperThreads(); } } finally { this.end(); } this.finishLock.checkForException(); this.processDeregisterQueue(); int var3 = this.updateSelectedKeys(); this.resetWakeupSocket(); return var3; } } }
Selector使用示例
服务端
public static void main(String[] args) throws Exception { try { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000)); ssc.configureBlocking(false); Selector selector = Selector.open(); // 注册 channel,并且指定感兴趣的事件是 Accept ssc.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer readBuff = ByteBuffer.allocate(1024); ByteBuffer writeBuff = ByteBuffer.allocate(128); writeBuff.put("received".getBytes()); writeBuff.flip(); while (true) { int nReady = selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { // 创建新的连接,并且把连接注册到selector上 // 声明这个channel只对读操作感兴趣。 SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); readBuff.clear(); socketChannel.read(readBuff); readBuff.flip(); System.out.println("received : " + new String(readBuff.array())); key.interestOps(SelectionKey.OP_WRITE); } else if (key.isWritable()) { writeBuff.rewind(); SocketChannel socketChannel = (SocketChannel) key.channel(); socketChannel.write(writeBuff); key.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } }
客户端
public static void main(String[] args) throws Exception { try { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000)); ByteBuffer writeBuffer = ByteBuffer.allocate(32); ByteBuffer readBuffer = ByteBuffer.allocate(32); writeBuffer.put("hello".getBytes()); writeBuffer.flip(); while (true) { Thread.sleep(1000); writeBuffer.rewind(); socketChannel.write(writeBuffer); readBuffer.clear(); socketChannel.read(readBuffer); } } catch (IOException e) { } }