今天我们说说Java中的多线程。
这个主题中,绝大多数内容在Java5就已经就绪了,只有少数的特性是在Java7以后加入的,其中完整的功能只有在Java7中加入的Fork/Join框架。

也许您会问,Java7的东西还算新优特性么?我觉得还真算,因为到今天,真正能用好Fork/Join多线程框架的Java程序也不算多。而且更重要的是Java8加入的Stream特性中的并行流,正是基于Fork/Join框架的。
因为多线程并发编程是Java语言及其重要的知识点,我会把多线程相关的知识点都梳理一边,只是Java7以前的知识就不再展开讲述了,只对Java7以后加入的部分进行叙述。
1. Thread
Java的线程模型自从Java语言诞生就存在了,这也是当年Sun公司重点宣传的内容,要知道那时c语言连pthread.h都没有统一。所以能用Thread.start()方便的启动新线程,以及synchronized对方法或者代码块加锁,无疑大大节省了程序员的工作量。
线程之间操作对象的并发操作,是由Java一切类的根本Object类中的方法,wait/notify/notifyAll所提供的。对象执行某个线程可以停下来等待,其他线程可以通知它继续工作。
但是线程启动之后如何控制呢?又如何更好地处理并发?直到并发包给出了解答。
2. JSR 166
Java5发布时,并发包java.util.concurrent被加入。这是大神级纽约州立大学Doug Lea教授开发的高效能线程并发处理包,由JSR 166定义的规范和实现。Doug Lea目前还是OpenJDK的治理委员会成员,可以说是对Java最有影响力的人之一。
Java并发包加入后,Java语言对于线程的控制能力立刻得到极大的加强。
我们从几方面去看:
a. 执行程序接口。加入了Executor,ExecutorService接口,可以立刻执行,提交稍候执行线程,也可以对已经执行的线程进行终止。
b. 执行程序实现。ThreadPoolExecutor和ScheduleThreadPoolExector提供了可以调节的,灵活的线程池。
c. 同步器。Semaphore信号量用来获取和释放资源。CountDownLatch用于给定数目信号前执行的阻塞。CyclicBarrier用于屏障线程的执行。Exchange可以和伙伴线程交换对象。
d. Condition和Lock, ReadWriteLock接口是synchronized关键字的细粒度面向对象实现,而ReentrantReadWriteLock实现了可重入的读写锁。AbstractQueuedSynchronizer是并发包中一切的根本,提供了一个FIFO队列的框架,提供的方法有独占方式和共享方式获取对象。
e. 另外还有支持线程安全的数据结构集合,如ConcurrentHashMap, CopyOnWriteArrayList,还有新增加的Queue,Deque接口的实现,如ConrrentLinkQueue, ArrayBlockQueue实现等。
f. java.util.concurrent.atomic包中的类,对类型数值进行原子级别的更新操作。
3. Fork/Join
Fork/Join框架就是采用“分而治之”的思想,将大任务分拆成小任务,分配给多个线程执行,合并后得到最终结果,加快整个系统的处理速度。
我们生活中,绝大多数复杂的工作都是用这样的思想拆分的。将大的任务分拆为小的任务,委派给其他人去处理。

Fork/Join是在Java7中被加入的,那时正是Hadoop技术如火如荼的时候。在Java8,Fork/Join又进行了不断的完善,目前成为Stream并行处理的线程基础类,可以说是Java中非常重要特性。
ForkJoinPool是运行时核心线程池,其中包含了若干个工作线程,每个工作线程有一个双向队列(deque),里面存放了工作任务,当一个工作线程完成处理其任务后,使用工作窃取(work-stealing)算法来从其他线程的工作队列的末尾去取得一些工作任务,帮助“队友”继续完成,这也是我们日常所说的“具有主人翁团队精神”。比起无任何处理的普通线程池,这样的设计无疑提高了整个系统的工作效率。
Java8后,ForkJoinPool加入静态commonPool()方法,可以直接获得一个优化的线程池。
交给工作线程处理的就是工作任务了,用抽象类ForkJoinTask来表示。
提交给ForkJoinPool有很多种方式,归类可以为:
execute:异步提交任务
invoke:提交并等待返回任务结果
submit:提交任务并返回Future
通过选用不同的提交方法,可以灵活的处理各种情况。
ForkJoinTask是一个抽象类,实现类有:
RecursiveAction:用于没有返回结果的任务,处理方法为void computer()。
RecursiveTask:用于有返回结果的任务,完成后需要告知运算后的结果,处理方法为T computer(),泛型T可以声明为需要的类型。
CountedCompleter:这个类可以在跟踪任务的进行过程,具有pending任务的概念,每次当一个子任务要加入时,addToPendingCount方法被调用,内部的技术器被修改。这个计数器决定着这个任务是否已经完成。当方法tryComplete()调用时,完成事件会被发往所有监听onCompletion回调方法。这样就给我们提供了更细腻的控制方式。比起上面两种的分治方法,我们可以在完成单个任务后具体做这个任务的相关善后操作,不用另外编程就可以获得任务的计数,还可以通过getRoot方法获得根计算任务,利用firstComplete和nextComplete遍历所有计数任务。
在开发时,有三个关键的技术关键实现点:
a. 启动新的任务时机。一般是设定一个阈值,当一个任务工作量超过这个值,就启动一个新的工作任务。
b. 执行任务运算,即computer方法,根据是否带出运算结果,以及是否要细粒度控制任务进行来选用不同的ForkJoinTask实现类。
c. 归并结果,join等待所有结果并计算。
任务在执行过程中,可以被取消cancel。也会中间出错,getException方法可以查看异常情况。
4. 扩展更新
a. Phaser
Phaser是JDK7加入的工具,表示“阶段器”。功能上与CyclicBarrier、CountDownLatch有些重叠,而更加灵活强大。
在Phaser中,把多个线程协作执行的任务划分为多个阶段,编程时要明确各个阶段的任务,每个阶段都可以有任意个参与者,线程都可以随时注册并参与到某个阶段。
Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才能进行下一步。当我们有并发任务并且需要分解成几步执行的时候,这种机制就非常适合。 CyclicBarrier CountDownLatch 只能在构造时指定参与量,而Phaser可以动态的增减参与量。
register()把一个新的参与者注册到phase中
arrive()这个方法通知Phase对象一个参与者已经完成了当前阶段
awaitAdvance(int phase):如果传入的阶段与当前阶段一致,会将当前线程休眠,直到这个阶段的所有参与者都运行完成
arriveAndAwaitAdvance() 等待参与者达到指定数量,才继续运行
arriveAndDeregister()告知phaser对应的线程已经完成了当前阶段,并它不会参与到下一阶段的操作中
另外Phaser还支持中断Termination和层次结构Tiering
通过这种可以控制当前阶段工作线程个数,以及是否需要停止等待的注册机制,可以方便的对线程进行并发操作。
b. StampedLock
StampedLock是JDK8中加入的一个新的锁,其实它可以通过方法获取到三种不同种类的锁,分别是:
写锁writeLock,独占锁,同时只有一个线程可以获取该锁。当一个线程获取该锁后,其它请求的线程必须等待
悲观读锁readLock,共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁,如果已经有线程持有写锁,其他线程请求获取该读锁会被阻塞。
乐观读锁tryOptimisticRead,若读的操作很多,写的操作很少的情况下,可以乐观地认为,写入与读取同时发生几率很少。即便有线程在读,也不用悲观地使用完全的读取锁定。程序读取数据后,可以查看是否有写入的变更,再采取相应操作,比如重读取信息,或者抛出异常。在对事务要求不高的场景下,比如读取点赞数,数据脏读是没有什么关系的,这时就可以使用乐观读锁,只对写入操作进行数据加锁同步操作。有一个方法validate可以对时间戳进行校验。同时支持读锁和写锁的相互转换
StampedLock适用于读多写少的场景,能够极大的提高读取存储的吞吐量。
~~~~
并发线程就先到这里吧,关于高级特性Flow和CompletionFuture另外写文章讲述。
祝大家国庆节快乐。




