暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

线程异步组合&简单线程池

研程序笔记 2021-08-04
310
  • 一、入门使用

  • 二、异步&线程池

    • 1、创建异步对象提交任务runAsync、supplyAsync

    • 2、whencomplete接收任务返回值

    • 3、handle方法【对返回值进行加工,再返回】

    • 4、线程串行化方法【then】

    • 5、两任务组合–-都要完成

    • 6、两任务组合-一个完成

    • 7、多任务组合【总结】

    • 1、初始化线程的4种方式

    • 2、线程池execute和submit区别

    • 3、创建线程池的方式

    • 4、常见的4种线程池

    • 5、使用线程池的好处

    • 6、CompletableFuture异步编排


一、入门使用

    CompletableFuture ff = new CompletableFuture();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("begin.....");
ff.complete("返回的及俄国");
}
}).start();

System.out.println(ff.get());

异步1

  CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("!!!!!!!!!!" + Thread.currentThread().getId());
int i = 10 / 0;
return 5;
//执行完成以后操作
}, threadPool).whenComplete((res, exection) -> {
System.out.println("KK" + res + "---------" + exection);
}).exceptionally(throwable -> {
//异常以后,返回默认值,异常处理
return 10;
});


CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("!!!!!!!!!!" + Thread.currentThread().getId());
int i = 10 / 2;
return 5;
//执行完成以后操作
}, threadPool).handle((resulr, threable) -> {

if (resulr != null) {
return resulr * 2;
}
if (threable != null) {
System.out.println(threable);
return 0;
}
return 0;
});
System.out.println(completableFuture1.get() + "haha");

二、异步&线程池

为了提高性能,一些服务会写成异步的



1、初始化线程的4种方式

1、实际开发中,只用线程池【高并发状态开启了n个线程,会耗尽资源】
2、当前系统中线程池只有一两个,每个异步任务提交给线程池让他自己去执行


1)、继承Thread
2)、实现 Runnable接口
3)、实现 Callable接口+FutureTask(可以拿到返回结果,可以处理异常)
4)、线程池

区别;
12不能得到返回值。3可以获取返回值
123都不能控制资源
4可以控制资源,性能稳定,不会一下子所有线程一起运行



2、线程池execute和submit区别

execute:参数只能是Runnable,没有返回值
submit:参数可以是RunnableCallable,返回值是FutureTask



3、创建线程池的方式

1、创建一个固定类型的线程池
Executors.newFixedThreadPool(10);

2、直接创建,7个参数
new ThreadPoolExecutor(corePoolSize,maximumPoolSizekeepAliveime,TimeUnitunit,
workQueue,threadFactory,handler);
corePoolSize:核心线程数,一直存在,一开始只是new 并没有start
maximumPoolSize:最大线程数量,控制资源
keepAliveime maximumPoolSize-corePoolSize 超过空闲时间释放线程】
TimeUnitunit:时间单位
workQueue 阻塞队列,只要有线程空闲,就会去队列取出新的任务执行
threadFactory:线程的创建工厂【可以自定义】
RejectedExecutionHandler handler:拒绝策略

3、顺序:
1、先创建核心线程运行任务
2、核心线程满放入阻塞队列
new LinkedBlockingDeque()默认是Integer的最大值,
3、阻塞队列满了继续创建线程,最多创建maximumPoolSize
4、如果传入了拒绝策略会执行,否则抛出异常
5、拒绝策略:
1、丢弃最老的 Rejected
2、调用者同步调用,直接调用run方法,不创建线程了 Caller
3、直接丢弃新任务 Abort 【默认使用这个】
4、丢弃新任务,并且抛出异常 Discard



4、常见的4种线程池

1CachedThreadPool:核心线程数是0,如果空闲会回收所有线程【缓存线程池】
2FixedThreadPool:核心线程数 = 最大线程数,【不回收】
3ScheduledThreadPool:定时任务线程池,多久之后执行【可提交核心线程数,最大线程数是Integer.Max
4SingleThreadPool:核心与最大都只有一个【不回收】,后台从队列中获取任务



5、使用线程池的好处

1、降低资源的消耗【减少创建销毁线程的开销】
通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗

2、提高响应速度【控制线程个数】
因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行

3、提高线程的可管理性【例如系统中可以创建两个线程池,核心线程池、非核心线程池【短信等】,关闭非核心线程池释放内存资源】
线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配



6、CompletableFuture异步编排

1、异步调用 要编排好顺序

下例:456依赖1的结果

2、实现了Future,可以获得异步调用的结果
很像vuePromise,执行完一个ajax请求继续then执行下一个


1、创建异步对象提交任务runAsync、supplyAsync

CompletableFuture提供了四个静态方法来创建一个异步操作。

1public static Completab1eFuture<Void> runAsync(Runnable runnable)
2public static completableFuturecVoid> runAsync(Runnable runnableExecutor executor)
3public static <U> CompletableFuture<U> supplyAsync(Supplier<U>supplier)
4public static <U> CompletableFuturecU> supplyAsync(Supplier<U> supplierExecutor
executor)

1runXXX没有返回结果,supplyXxx有返回结果
2、可以传入自定义线程池,否则使用默认线程池

public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(ThreadTest::asyncFunction, excutor);
System.out.println("main end.... result: " + future.get());
}

public static int asyncFunction() {
System.out.println("线程池执行任务1 1+1");
return 1 + 1;
}
}


2、whencomplete接收任务返回值

总结:一般用handle,因为whencomplete如果异常不能给定默认返回结果,需要再调用exceptionally,而handle可以

该方法作用:获得前一任务的返回值【自己也可以是异步执行的】,也可以处理上一任务的异常,调用exceptionally修改前一任务的返回值【例如异常情况时给一个默认返回值】而handle方法可以简化操作
public completableFuture<T> whencomplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T>whenCompleteAsync(BiConsumer <? super T,? super Throwable>
action);
public completableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable>
actionExecutor executor);
public completableFuture<T>exceptionally(Function<Throwable,? extends T> fn);


whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenCompletewhenCompleteAsync的区别:
whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
whenCompleteAsync:是执行把 whenCompleteAsync这个任务继续提交给线程池
来进行执行。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程
执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(ThreadTest::asyncFunction, excutor)
.whenComplete(ThreadTest::accept)
.exceptionally(ThreadTest::apply);
System.out.println("main end.... 返回值:" + future.get());
}

public static int apply(Throwable exception) {
System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
return 100;
}


public static void accept(Integer result, Throwable exception) {
System.out.println("获取任务1的结果:" + result);
System.out.println("获取任务1的异常:" + exception);
}

public static int asyncFunction() {
System.out.println("线程池执行任务1 10 / 0");
return 10 / 0;
}
}

main start ....
线程池执行任务1 10 / 0
获取任务1的结果:null
获取任务1的异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
获取任务1的异常,并提供一个默认返回值java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main end.... 返回值:100



3、handle方法【对返回值进行加工,再返回】

总结:使用R apply(T t, U u); 可以感知异常,和修改返回值的功能。
public <U> completionStage<U> handle(BiFunction<? super TThrowable? extends U> fn);
public <U> completionStage<U>handleAsync(BiFunction<? super TThrowable? extends U>
fn);
public > CompletionStage<U> handleAsync(BiFunction<? super TThrowable? extends U>
fn,Executor executor ) ;
complete一样,可对结果做最后的处理(可处理异常),可改变返回值。


public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(ThreadTest::task, excutor)
.whenComplete(ThreadTest::accept)
.exceptionally(ThreadTest::apply)
.handle(ThreadTest::handle);
System.out.println("main end.... 返回值:" + future.get());
}

public static int handle(Integer result, Throwable exception) {
System.out.println("获取任务1的结果:" + result);
System.out.println("获取任务1的异常:" + exception);
System.out.println("异常不会传播,前面调用exceptionally方法处理了异常");
return 200;
}

public static int apply(Throwable exception) {
System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
return 100;
}


public static void accept(Integer result, Throwable exception) {
System.out.println("获取任务1的结果:" + result);
System.out.println("获取任务1的异常:" + exception);
}

public static int task() {
System.out.println("线程池执行任务1 10 / 0");
return 10 / 0;
}
}


4、线程串行化方法【then】

thenRun:继续执行,不接受上一个任务的返回结果
thenAccept:继续执行,接受上一个任务的返回结果
thenApply:继续执行,感知上一任务的返回结果,并且自己的返回结果也被下一个任务所感知
public <U> CompletableFuturecU> thenApply(Function<? super T,? extends U> fn)
public <U> Completab1eFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn
Executor executor)
public completionstage<Void> thenAccept(Consumer<? super T> action);
public completionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStagecVoid> thenAcceptAsync(Consumer<? super T> action,Executor
executor);
public Completionstage<Void> thenRun(Runnable action);
public Completionstage<Void> thenRunAsync(Runnable action);
public completionStage<Void> thenRunAsync(Runnable action,Executor executor);

打印结果:
main start ....
任务1启动 10 / 0
获取任务1的结果:null
获取任务1的异常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
获取任务1的异常,并提供一个默认返回值java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
获取任务1的结果:100
获取任务1的异常:null
异常不会传播,前面调用exceptionally方法处理了异常
任务2启动 1 + 1 =
获取任务1的结果:200
main end.... 返回值:2

public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(ThreadTest::task, excutor)
.whenComplete(ThreadTest::accept)
.exceptionally(ThreadTest::apply)
.handle(ThreadTest::handle)
.thenApply(ThreadTest::thenApply);
System.out.println("main end.... 返回值:" + future.get());
}

public static int thenApply(Integer result) {
System.out.println("任务2启动 1 + 1 = ");
System.out.println("任务2获取任务1的结果:" + result);
return 2;
}

public static int handle(Integer result, Throwable exception) {
System.out.println("获取任务1的结果:" + result);
System.out.println("获取任务1的异常:" + exception);
System.out.println("异常不会传播,前面调用exceptionally方法处理了异常");
return 200;
}

public static int apply(Throwable exception) {
System.out.println("获取任务1的异常,并提供一个默认返回值" + exception);
return 100;
}


public static void accept(Integer result, Throwable exception) {
System.out.println("获取任务1的结果:" + result);
System.out.println("获取任务1的异常:" + exception);
}

public static int task() {
System.out.println("任务1启动 10 / 0");
return 10 / 0;
}
}



5、两任务组合–-都要完成

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);


public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

两个任务必须都完成,触发该任务。
thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,
处理该任务。



demo

public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("任务1 start..");
System.out.println("任务1 end..");
return 1+1;
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
System.out.println("任务2 start..");
System.out.println("任务2 end..");
return "hello";
});

CompletableFuture<String> future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
return "任务3 :组合前两个任务的返回值返回 --" + result1 + "---" + result2;
}, excutor);
System.out.println("main end.... 返回值:" + future3.get());
}
}


6、两任务组合-一个完成

1applyToEitherFunction 带参有返回值【能获取前面任务的结果,自己有返回结果】【成功的那个任务的结果】
2acceptEitherConsumer 带参无返回值【能获取前面任务的结果,自己没有返回结果】
3runAfterEitherRunnable 无参无返回值【不能获取前面任务的结果,自己也没有返回结果】
supplyAsyncSupplier:无参有返回值【不能获取前面任务的结果,自己有返回值】
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(asyncPool, other, fn);
}

public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}

public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(asyncPool, other, action);
}

public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(asyncPool, other, action);
}

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
public class ThreadTest {
public static ExecutorService excutor = Executors.newFixedThreadPool(10);

public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main start ....");

CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(()->{
System.out.println("任务1 start..");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1 end..");
return 1+1;
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
System.out.println("任务2 start..");
System.out.println("任务2 end..");
return "hello";
});

// 两个任务都完成才执行任务3
// CompletableFuture<String> future3 = future1.thenCombineAsync(future2, (result1, result2) -> {
// return "任务3 :组合前两个任务的返回值返回 --" + result1 + "---" + result2;
// }, excutor);

// 任一任务完成就可以执行任务3【返回值是future1的泛型】
CompletableFuture<String> future3 = future1.applyToEitherAsync(future2, (result) -> {
return "任务3 :组合先执行完的任务的结果 --" + result;
}, excutor);
System.out.println("main end.... 返回值:" + future3.get());
}
}



7、多任务组合【总结】

1、等待所有任务完成
2、只有一个任务完成
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}



老师的文档:

其实越急躁,越透露出自己的无知。一次平稳的交流创造出非常不安的情绪不是一个好的交流对象,交流应该是舒服的。当你发现自己很急躁很压迫对方的时候,说明你在这个知识点的欠缺并且急迫让对方来缓解你的漏缺,没有人会为你的无知买单,不要等着靠别人来教会我们


文章转载自研程序笔记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论