前言
读者可根据需要选择不同的小节进行阅读。
Java AIO描述
Java AIO例子
AsynchronousServerSocketChannel.open()原理
AsynchronousServerSocketChannel.bind()原理
serverSocketChannel.accept(null, new AcceptHandler())原理
socketChannel.read(byteBuffer, byteBuffer, new ReadHandler())原理
附录一 Linux 服务端网络编程
附录二 Linux 客户端网络编程
附录三 Java AIO涉及到的Native JNI实现
一、Java AIO描述
二、Java AIO例子
三、
AsynchronousServerSocketChannel.open()原理
public static AsynchronousServerSocketChannel open() throws IOException{// 不指定异步通道组return open(null);}public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group)throws IOException{// 如果是同一个组的,自然AsynchronousServerSocketChannel的提供者AsynchronousChannelProvider要保持一致,不过我们这里不是AsynchronousChannelProvider provider = (group == null) ?AsynchronousChannelProvider.provider() : group.provider();return provider.openAsynchronousServerSocketChannel(group);}
我们看到最后我们需要一个提供AsynchronousServerSocketChannel的AsynchronousChannelProvider,毕竟Java跨平台,有windows的和Linux的对吧。
public static AsynchronousChannelProvider provider() {return ProviderHolder.provider;}private static class ProviderHolder {// 类加载时才初始化providerstatic final AsynchronousChannelProvider provider = load();private static AsynchronousChannelProvider load() {return AccessController.doPrivileged(new PrivilegedAction<AsynchronousChannelProvider>() {public AsynchronousChannelProvider run() {AsynchronousChannelProvider p;// 先通过Property获取p = loadProviderFromProperty();if (p != null)return p;// 否则通过ServiceLoader来获取p = loadProviderAsService();if (p != null)return p;// 如果都没有指定,那么我们使用DefaultAsynchronousChannelProvider创建默认的提供者return sun.nio.ch.DefaultAsynchronousChannelProvider.create();}});}private static AsynchronousChannelProvider loadProviderFromProperty() {// 当然我们没有设置,所以自然为nullString cn = System.getProperty("java.nio.channels.spi.AsynchronousChannelProvider");if (cn == null)return null;Class<?> c = Class.forName(cn, true,ClassLoader.getSystemClassLoader());return (AsynchronousChannelProvider)c.newInstance();}private static AsynchronousChannelProvider loadProviderAsService() {// 我们也没有使用Java的SPI接口,也即没有设置META-INF/services,所以自然也没有ServiceLoader<AsynchronousChannelProvider> sl =ServiceLoader.load(AsynchronousChannelProvider.class,ClassLoader.getSystemClassLoader());Iterator<AsynchronousChannelProvider> i = sl.iterator();for (;;) {try {return (i.hasNext()) ? i.next() : null;} catch (ServiceConfigurationError sce) {if (sce.getCause() instanceof SecurityException) {// Ignore the security exception, try the next providercontinue;}throw sce;}}}}
我们知道我们没有通过系统变量设置,也没有使用SPI接口,所以只能创建默认的提供者,我们直接看源码。
public static AsynchronousChannelProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osname.equals("SunOS"))return createProvider("sun.nio.ch.SolarisAsynchronousChannelProvider");// 注意这里,我们研究对象是Linuxif (osname.equals("Linux"))return createProvider("sun.nio.ch.LinuxAsynchronousChannelProvider");if (osname.contains("OS X"))return createProvider("sun.nio.ch.BsdAsynchronousChannelProvider");throw new InternalError("platform not recognized");}
private static AsynchronousChannelProvider createProvider(String cn) {Class<AsynchronousChannelProvider> c;try {c = (Class<AsynchronousChannelProvider>)Class.forName(cn);} catch (ClassNotFoundException x) {throw new AssertionError(x);}try {return c.newInstance(); // 直接调用默认构造器初始化} catch (IllegalAccessException | InstantiationException x) {throw new AssertionError(x);}}
我们来看看LinuxAsynchronousChannelProvider的构造函数。
// 什么也没干public LinuxAsynchronousChannelProvider() {}
最后我们可以研究provider.openAsynchronousServerSocketChannel(group)如何创AsynchronousServerSocketChannel了。
public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) throws IOException{return new UnixAsynchronousServerSocketChannelImpl(toPort(group));}private Port toPort(AsynchronousChannelGroup group) throws IOException {if (group == null) {// 我们这里group为null,所以创建默认的EventPortreturn defaultEventPort();} else {if (!(group instanceof EPollPort))throw new IllegalChannelGroupException();return (Port)group;}}// 创建默认的EventPortprivate EPollPort defaultEventPort() throws IOException {if (defaultPort == null) {synchronized (LinuxAsynchronousChannelProvider.class) {if (defaultPort == null) {// 创建EPollPort。注意这里的ThreadPool.getDefault表示默认线程池,然后,还start了?这里启动了线程defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();}}}return defaultPort;}// 默认线程池static ThreadPool getDefault() {return DefaultThreadPoolHolder.defaultThreadPool;}// 返回全局单例默认线程池private static class DefaultThreadPoolHolder {final static ThreadPool defaultThreadPool = createDefault();}// 创建默认线程池static ThreadPool createDefault() {// 我们可以通过系统变量来设置,同样我们根本没有设置,所以返回-1int initialSize = getDefaultThreadPoolInitialSize();if (initialSize < 0) // 那么我们这里默认就是CPU的核心数initialSize = Runtime.getRuntime().availableProcessors();ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();if (threadFactory == null)threadFactory = defaultThreadFactory;// 通过Executors创建了Cached线程池,读者可能会说,这里不是说CachedThreadPool线程无限大吗?别着急,看我们下面的ThreadPool对它进行了包装ExecutorService executor = Executors.newCachedThreadPool(threadFactory);return new ThreadPool(executor, false, initialSize);}// ThreadPool构造器,保存真正执行器executor和最大线程数,有这个约束,就不怕CachedThreadPool线程大过poolSize了private ThreadPool(ExecutorService executor,boolean isFixed,int poolSize){this.executor = executor;this.isFixed = isFixed;this.poolSize = poolSize;}
那么我们继续研究new EPollPort(this, ThreadPool.getDefault()).start()干了什么?
// EPollPort构造器。初始化epfd,socketpair,添加sv[0]的epoll监听数据读事件,最后分配事件队列EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)throws IOException{super(provider, pool);// 创建了一个epoll的fd,这里就到JVM了,最后又会回到Glibc的epoll_create我们不聊这个,了解即可this.epfd = epollCreate();// 创建套接字对,创建一对CP用于唤醒操作int[] sv = new int[2];try {// 这里就又跑到JVM底层啦,这里其实是通过socketpair函数 创建套接字对,用于进程或者线程间的通讯操作。当然我们也可以在同一个进程中使用这个套接字对写入或者读取。读者可以类比于Java ByteArrayInputStream和ByteArrayOutStreamsocketpair(sv);// 通过epoll的ctl控制操作添加一个sv[0]的fd,监听的操作为POLLIN,表明读操作。这时我们就可以通过sv[1]向sv[0]发送事件,这时会被epoll捕捉到这个读事件epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);} catch (IOException x) {close0(epfd);throw x;}// 保存进程通讯的套接字this.sp = sv;// 分配轮询事件的数组,这里MAX_EPOLL_EVENTS为512this.address = allocatePollArray(MAX_EPOLL_EVENTS);// 创建Java的ArrayBlockingQueue用于保存Java层面的事件this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);// 然后我们向队列中放入一个事件对象NEED_TO_POLL,表明当前队列需要线程进行轮询处理事件this.queue.offer(NEED_TO_POLL);}// 开始启动线程处理队列事件EPollPort start() {startThreads(new EventHandlerTask());return this;}// 在线程池中开始处理EventHandlerTask对象protected final void startThreads(Runnable task) {// 由于我们传入的是ThreadPool对象,里面持有了CachedThreadPool,并且不是fixed线程池。所以这里isFixedThreadPool()将返回falseif (!isFixedThreadPool()) {// 那么我们启动内部线程处理IO事件,注意这里的internalThreadCount默认为1for (int i=0; i<internalThreadCount; i++) {startInternalThread(task);threadCount.incrementAndGet();}}// 然后我们看看线程池的大小是否大于0,我们这里设置的是等于CPU核心数,所以满足条件if (pool.poolSize() > 0) {// 包装外部的EventHandlerTask任务对象,向处理线程绑定当前AsynchronousChannelGroupImpl对象。内部通过ThreadLocal来传递task = bindToGroup(task);try {// 向线程池中提交任务执行,并处理IO事件for (int i=0; i<pool.poolSize(); i++) {pool.executor().execute(task);threadCount.incrementAndGet();}} catch (RejectedExecutionException x) {}}}
在上面的EPollPort start方法中,我们启动了内部线程和线程池,并且向其中执行了我们传入的EventHandlerTask对象。
并且根据我们设置的线程池大小,向其中放入对应大小的任务对象,所以我们可以看到是多个线程同时处理EventHandlerTask对象。
这里我们来看看EventHandlerTask的Runnable方法,看看它里面做了什么工作。
private class EventHandlerTask implements Runnable {// 对epoll进行轮询,获取事件准备好的channel,也即网络事件private Event poll() throws IOException {try {for (;;) {// 还记得我们调用allocatePollArray方法开辟的C层面用于服务epoll的队列吗?这个队列这里就用到了,当我们调用epollWait时,内核会将准备好的事件放入到address的队列中返回。返回值n为准备好的事件数。int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);fdToChannelLock.readLock().lock(); // 获取通道读锁try {// 处理网络事件while (n-- > 0) {// 获取已经准备好事件的fdlong eventAddress = getEvent(address, n);int fd = getDescriptor(eventAddress);// 如果fd为之前我们设置的socketpair。那么唤醒等待数据的线程if (fd == sp[0]) {// 等待数据的线程等待唤醒计数为0,那么表明没有线程需要当前套接字对的数据,所以我们调用drain1函数,将队列里所有的数据全部清除,避免内存浪费if (wakeupCount.decrementAndGet() == 0) {drain1(sp[0]);}// 我们通过n--来对n进行操作,这里判断减1后的n是否大于0,表明看看队列里是否还有更多的准备好的事件,如果有,那么我们向队列中放入EXECUTE_TASK_OR_SHUTDOWN事件if (n > 0) {queue.offer(EXECUTE_TASK_OR_SHUTDOWN);continue;}return EXECUTE_TASK_OR_SHUTDOWN;}// 此时fd为正常网络事件,那么我们从之前保存fd到channel的映射表中获取到准备好的网络事件PollableChannel channel = fdToChannel.get(fd);if (channel != null) {// 那么我们现在封装好Event对象并返回int events = getEvents(eventAddress);Event ev = new Event(channel, events);// 如果队列里还有别的需要处理的事件,那么我们将当前ev放入队列,由其他线程获取执行,如果是最后一个事件对象,那么我们直接返回,由当前线程完成if (n > 0) {queue.offer(ev);} else {return ev;}}}} finally {fdToChannelLock.readLock().unlock();}}} finally {// 向队列中放入NEED_TO_POLL事件,注意我们这个队列是公用的,有多个线程同时指向当时任务,当前线程已经处理了自己获取的事件,那么设置这个NEED_TO_POLL事件,告诉其他线程可以进行轮询epoll事件queue.offer(NEED_TO_POLL);}}// 线程启动后直接执行public void run() {// 从ThreadLocal中获取到我们之前通过 task = bindToGroup(task) 放入的GroupAndInvokeCount对象Invoker.GroupAndInvokeCount myGroupAndInvokeCount =Invoker.getGroupAndInvokeCount();// 通过该标识判断,当前任务是否在ThreadPool中运行。前面我们看到内部会通过startInternalThread方法开启一个内部线程处理当前任务,剩余的任务对象将在ThreadPool中执行final boolean isPooledThread = (myGroupAndInvokeCount != null);boolean replaceMe = false;Event ev;try {// 循环处理事件for (;;) {// 如果在ThreadPool中执行,那么重置InvokeCount调用计数if (isPooledThread)myGroupAndInvokeCount.resetInvokeCount();try {replaceMe = false;// 从队列中获取事件ev = queue.take();// 注意,我们在EPollPort构造器的最后传入了该事件对象NEED_TO_POLL,该对象也指示当前工作线程应该调用poll方法获取网络事件if (ev == NEED_TO_POLL) {try {ev = poll();} catch (IOException x) {x.printStackTrace();return;}}} catch (InterruptedException x) { // 忽略中断异常,将会继续执行事件continue;}// 如果事件设置了EXECUTE_TASK_OR_SHUTDOWN那么,我们从全局taskQueue中获取任务执行,当然,如果任务为空,那么执行shutdown操作。在AsynchronousChannelGroupImpl的构造方法中对taskQueue进行赋值,不过这个taskQueue是针对于Fixed固定线程池操作的,所以我们当前传入的ThreadPool不是固定线程池,所以taskQueue为空。所以taskQueue为null,而pollTask中如果pollTask为null,那么返回的task也是null,所以只要我们这里遇到EXECUTE_TASK_OR_SHUTDOWN事件,那么必然是shutdown 结束当前线程if (ev == EXECUTE_TASK_OR_SHUTDOWN) {Runnable task = pollTask();if (task == null) {return;}replaceMe = true;task.run();continue;}// 开始处理事件try {// 通过调用channel的onEvent执行事件ev.channel().onEvent(ev.events(), isPooledThread);} catch (Error x) {replaceMe = true; throw x;} catch (RuntimeException x) {replaceMe = true; throw x;}}} finally {// 当线程退出后执行释放资源int remaining = threadExit(this, replaceMe);if (remaining == 0 && isShutdown()) {implClose();}}}}
读者一定要注意这里的EventHandlerTask是多个线程同时处理,所以其中如果我们发现还有别的事件需要处理,那么我们优先将事件放入队列中,由其他线程获取并执行,并且由于我们并没有使用固定线程池,所以我们这里的taskQueue为空。
public void onEvent(int events, boolean mayInvokeDirect) {// 获取更新锁,检测acceptPending标志位synchronized (updateLock) {if (!acceptPending)return;acceptPending = false;}FileDescriptor newfd = new FileDescriptor();InetSocketAddress[] isaa = new InetSocketAddress[1]; // 用于保存客户端的地址信息Throwable exc = null;try {begin(); // 获取读锁// 获取准备好的请求事件int n = accept0(this.fd, newfd, isaa);// 如果返回UNAVAILABLE,表明当前线程被虚假唤醒,因为我们是通过epoll wait拿到的网络事件,并且我们检测了返回的事件数,所以这种情况是不可能出现的if (n == IOStatus.UNAVAILABLE) {// 如果真的出现,那么我们复位acceptPending并且重新轮询当前fdsynchronized (updateLock) {acceptPending = true;}port.startPoll(fdVal, Port.POLLIN);return;}} catch (Throwable x) {if (x instanceof ClosedChannelException)x = new AsynchronousCloseException();exc = x;} finally {end(); // 释放读锁}// 接收到了客户端的连接且没有发生异常,那么我们调用finishAccept完成接收动作,并返回client端的AsynchronousSocketChannelAsynchronousSocketChannel child = null;if (exc == null) {try {child = finishAccept(newfd, isaa[0], acceptAcc);} catch (Throwable x) {if (!(x instanceof IOException) && !(x instanceof SecurityException))x = new IOException(x);exc = x;}}// 获取当前设置的acceptHandler和acceptAttachment对象并调用我们复写的complete函数CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler;Object att = acceptAttachment;PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture;enableAccept();if (handler == null) {future.setResult(child, exc);if (child != null && future.isCancelled()) {try {child.close();} catch (IOException ignore) { }}} else {Invoker.invoke(this, handler, att, child, exc);}}
和我们想的一模一样,获取到事件,然后调用我们设置的handler的complete方法。
所以我们发现了什么?
private AsynchronousSocketChannel finishAccept(FileDescriptor newfd,final InetSocketAddress remote,AccessControlContext acc) throws IOException, SecurityException{AsynchronousSocketChannel ch = null;try {// 创建客户端channel,这里也即UnixAsynchronousSocketChannelImpl。注意我们这里发现客户端channel和服务端channel用着一样的portch = new UnixAsynchronousSocketChannelImpl(port, newfd, remote);} catch (IOException x) {nd.close(newfd);throw x;}...return ch;}// 客户端socket channel构造器UnixAsynchronousSocketChannelImpl(Port port,FileDescriptor fd,InetSocketAddress remote)throws IOException{super(port, fd, remote);this.fdVal = IOUtil.fdVal(fd);IOUtil.configureBlocking(fd, false); // 同样将client channel设置为非阻塞try {port.register(fdVal, this); // 保存socket fd和socket channel的映射} catch (ShutdownChannelGroupException x) {throw new IOException(x);}this.port = port;}
我们看到EPollPort通过socketpair函数进行线程通讯,并且读者一定要理解到这里的队列分为两个,JVM底层关于OS的事件,和Java层面处理的事件。
abstract class AsynchronousServerSocketChannelImpl extends AsynchronousServerSocketChannel implements Cancellable, Groupable{// 保存C调用的OS创建的socket fdprotected final FileDescriptor fd;AsynchronousServerSocketChannelImpl(AsynchronousChannelGroupImpl group) {super(group.provider());// 这里其实就是调用了函数 socket(domain, type, 0) 创建了Linux 网络编程的套接字this.fd = Net.serverSocket(true);}}class UnixAsynchronousServerSocketChannelImpl extends AsynchronousServerSocketChannelImpl implements Port.PollableChannel{UnixAsynchronousServerSocketChannelImpl(Port port)throws IOException{super(port); // 初始化父类,我们这里就是AsynchronousServerSocketChannelImpltry {// 设置socket为非阻塞IOUtil.configureBlocking(fd, false);} catch (IOException x) {nd.close(fd); // prevent leakthrow x;}this.port = port;// 上面我们创建的FD最终为Java层面的类FileDescriptor,这里我们需要直接获取到调用socket函数返回的具体值,所以我们通过fdVal拿到socket函数返回的具体值this.fdVal = IOUtil.fdVal(fd);// 最后我们将这个fdVal和咱们当前的UnixAsynchronousServerSocketChannelImpl绑定在一起port.register(fdVal, this);}// PORT中用于绑定fd和channel。其实就是将其保存在一个Map集合中而已protected final Map<Integer,PollableChannel> fdToChannel =new HashMap<Integer,PollableChannel>();final void register(int fd, PollableChannel ch) {fdToChannelLock.writeLock().lock();try {if (isShutdown())throw new ShutdownChannelGroupException();fdToChannel.put(Integer.valueOf(fd), ch);} finally {fdToChannelLock.writeLock().unlock();}}
从上面的源码我们可以总结一下了。


👇推荐关注👇
有趣的行业资讯
干货技术分享
程序员的日常生活
......






