Java 8 parallelStream 使用不当造成高并发环境下性能急剧下降

in Java 8 with 0 commentand 18 read

Java 8 API 添加了一个新的抽象称为流 Stream,可以让你以一种声明的方式处理数据。

Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。

Stream API 可以极大提高 Java 程序员的生产力,让程序员写出高效率、干净、简洁的代码。

这种风格将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。

元素流在管道中经过中间操作(intermediate operation)的处理,最后由最终操作(terminal operation)得到前面处理的结果。

+--------------------+       +------+   +------+   +---+   +-------+
| stream of elements +-----> |filter+-> |sorted+-> |map+-> |collect|
+--------------------+       +------+   +------+   +---+   +-------+

以上的流程转换为 Java 代码为:

List<Integer> transactionsIds = 
widgets.stream()
             .filter(b -> b.getColor() == RED)
             .sorted((x,y) -> x.getWeight() - y.getWeight())
             .mapToInt(Widget::getWeight)
             .sum();

更多 Stream 的内容,请参考文末的资料。这里主要讲 parallelStream 使用不当造成的问题。

例子

场景如下,App 端调用后端接口,后端接口需要循环处理一个列表,把它转换为 App 需要的数据,处理列表中的每个项目比较耗时,想着能否并行处理这个列表,缩短处理时间。刚好 Java 8 提供了 parallelStream,实在太好了,但是,噩梦就来了。压测之后发现,并发量一上来,接口性能急剧下降。

假设我们需要处理一个长度为 10 的列表。

private static List<Integer> list = new ArrayList<>();

// 初始化一些数据
static {
    for (int i = 0; i < 10; i++) {
        list.add(i);
    }
}
// 模拟处理数据
private void doSomething() {
    try {
        TimeUnit.MILLISECONDS.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

时间统计类

public class Profiler {
    private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(() -> System.currentTimeMillis());

    public static void start() {
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }

    public static void end() {
        long duration = System.currentTimeMillis() - TIME_THREADLOCAL.get();
        System.out.println("执行耗时 " + duration + " ms");
    }
}

单线程环境下

普通 for 循环

@Test
public void testFor() {
    Profiler.start();
    for (Integer integer : list) {
        doSomething();
    }
    Profiler.end();
}

执行耗时 112 ms

Java 8 stream 串行流

@Test
public void testStream() {
    Profiler.start();
    list.stream().forEach(integer -> doSomething());
    Profiler.end();
}

执行耗时 117 ms

Java 8 parallelStream 并行流

@Test
public void testParallelStream() {
    Profiler.start();
    list.parallelStream().forEach(integer -> doSomething());
    Profiler.end();
}

执行耗时 31 ms

从上面的执行耗时可以看出,在单线程环境下,使用 Java 8 并行流确实可以提高程序的性能。

多线程高并发环境下

首先创建一个线程池,初始化 50 个线程。

private static ExecutorService executorService = Executors.newFixedThreadPool(50);

普通 for 循环

@Test
public void testForMultiThreads() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        executorService.submit(() -> testFor());
    }
    executorService.awaitTermination(1, TimeUnit.DAYS);
}
执行耗时 109 ms
执行耗时 107 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 108 ms
执行耗时 108 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 109 ms
......

Java 8 stream 串行流

@Test
public void testStreamMultiThreads() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        executorService.submit(() -> testStream());
    }
    executorService.awaitTermination(1, TimeUnit.DAYS);
}
执行耗时 105 ms
执行耗时 107 ms
执行耗时 107 ms
执行耗时 109 ms
执行耗时 109 ms
执行耗时 108 ms
执行耗时 109 ms
执行耗时 108 ms
执行耗时 110 ms
执行耗时 110 ms
执行耗时 110 ms
执行耗时 110 ms
执行耗时 108 ms
......

Java 8 parallelStream 并行流

@Test
public void testParallelStreamMultiThreads() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        executorService.submit(() -> testParallelStream());
    }
    executorService.awaitTermination(1, TimeUnit.DAYS);
}
执行耗时 55 ms
执行耗时 71 ms
执行耗时 68 ms
执行耗时 95 ms
执行耗时 94 ms
执行耗时 100 ms
执行耗时 263 ms
执行耗时 167 ms
执行耗时 262 ms
执行耗时 114 ms
执行耗时 111 ms
执行耗时 328 ms
执行耗时 336 ms
执行耗时 340 ms
执行耗时 109 ms
执行耗时 365 ms
执行耗时 114 ms
......
......

从上面的执行耗时可以看出,在多线程环境下,使用普通的 for 循环和串行流与单线程的时候性能相当,但是使用并行流,发现性能会急剧的下降。

为什么

parallelStream 背后其实是使用了 Fork/Join 框架来处理这些数据(也就是 ForkJoinPool ),所以 parallelStream 具有并行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,多个线程同时处理这些任务,这样会显著提高处理多数据的效率。

parallelStream 默认使用 ForkJoinPool.commonPool(),但是这个默认的线程池整个程序都会共享使用,而且这个线程池是单点争用的,也就是说在同一个程序里面 listA.parallelStream 和 listB.parallelStream 共享这个默认的线程池,listA 执行完了之后,才能轮到 listB。

在单线程环境下,性能是没有问题的。

但是在多线程环境下,比如 A、B、C、D 都调用 parallelStream()方法,A 争夺到了 commonPool 的使用权,B、C、D 都得等待,而且管理 fork 和 join 任务,对象创建和垃圾回收也会有开销,这就造成了性能的急剧下降。

结论

参考资料

评论