暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

使用并行流(ParallelStream)避坑

码酱 2021-02-16
6085

在我们经常写的业务代码中很多时候会出现遍历循环的情况,比如取集合数据、封装集合数据等等,这是我们不能避免的。在jdk1.8中给我们提供了stream;

在很多时候普通的for循环就够了,因为数据量不大的情况下,jdk底层对它的优化是非常好的。所以看情况而定,不是说所有的循环都要用流遍历。大数据量的遍历用parallelStream可以比普通遍历节省一半的时间,这个亲测过。

在使用stream.foreach时这个遍历没有线程安全问题,但是使用parallelStream就会有线程安全问题,所有在parallelStream里面使用的外部变量,比如集合一定要使用线程安全集合,不然就会引发多线程安全问题。

Java8并行流ParallelStream和Stream的区别就是支持并行执行,提高程序运行效率。但是如果使用不当可能会发生线程安全的问题。Demo如下:
public static void testParallelStream() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(i);
}
List<Integer> parallelList = new ArrayList<>();
list.parallelStream().filter(i -> i % 2 == 0).forEach(i -> parallelList.add(i));
System.out.println();
parallelList.stream().forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
System.out.println("Sleep 5 sec");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
parallelList.stream().forEachOrdered(e -> System.out.print(e + " "));
}
程序运行结果如下:
66 62 64 72 74 68 70 56 58 60 54 50 52 6 8 10 4 0 2 90 92 82 null null 88 78 80 96 76 98 94 16 12 14 18 22 20 24 32 44 28 26 30 34 36 48 40 38 46 
Sleep 5 sec
66 62 64 72 74 68 70 56 58 60 54 50 52 6 8 10 4 0 2 90 92 82 null null 88 78 80 96 76 98 94 16 12 14 18 22 20 24 32 44 28 26 30 34 36 48 40 38 46
除了以上在ForEach里面添加集合元素会出现问题,以下这种方式也会:
list.parallelStream().map(e -> { parallelList.add(e);return e; }).forEachOrdered(e -> System.out.print(e + " "));

  • 两个问题:
    1.为什么parallelList会有null元素?
    2.为什么parallelList的大小不固定?

最初我以为是因为主线程执行完成后并行流中的线程并未结束,sleep了主线程后发现结果并没有发生改变,其实我们可以认为ArrayList内部维护了一个数组arr其定义一个变量 n用以表式这个数组的大小那么向这个ArrayList中存储数据的过程可以分解为这么几步:

1.读取数组的长度存入n

2.向这个数组中储入元素arr[n]=a

3.将n+1

4.保存n
而对于parallelList元素数量不固定的原因就是多线程有可能同时读取到相同的下标n同时赋值,这样就会出现元素缺失的问题了。
如何解决这个问题呢?


方案一:我们可以将其转化为一个同步集合也就是

List<Integer> parallelList = Collections.synchronizedList(new ArrayList<>());
  • 在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序。
  • 顺便提一嘴,forEachOrderedforEach的区别就是forEach是并行处理的,forEachOrdered是按顺序处理的,显然前者速度更快。
    //输出的顺序不一定(效率更高)
    Stream.of("AAA", "BBB", "CCC").parallel().forEach(s -> System.out.println("Output:" + s));
    //输出的顺序与元素的顺序严格一致
    Stream.of("AAA", "BBB", "CCC").parallel().forEachOrdered(s -> System.out.println("Output:" + s));


方案二:那就是最后调用collect(Collectors.tolist()),这种收集起来所有元素到新集合是线程安全的。

Demo如下:

public static void testParallelStream() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add(i);
}
List<Integer> parallelList = list.parallelStream().filter(i -> i % 2 == 0).collect(Collectors.toList());
System.out.println();
parallelList.stream().forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
System.out.println("Sleep 5 sec");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
parallelList.stream().forEachOrdered(e -> System.out.print(e + " "));
}

程序运行结果如下:

0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 
Sleep 5 sec
0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98
不光没有出现null和数量不一致问题,还排序了!所以,在采用并行流收集元素到集合中时,最好调用collect方法,Foreach方法或者map方法慎用!
文章转载自码酱,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论