JDK8新特性(三)-聚合操作

参考https://docs.oracle.com/javase/tutorial/collections/streams/

为了更好地理解这篇文章的内容,强烈建议把前两篇介绍lambda表达式和方法引用的文章先看下。

一、聚合操作

依然考虑Person这个类,要打印集合roster中每个Person的名字,先看普通的写法:

for (Person p : roster) {
System.out.println(p.getName());
}

再看使用聚合操作forEach的写法:

roster
.stream()
.forEach(e -> System.out.println(e.getName());

单纯打印信息太简单,看不出聚合操作的优势,接着看:

管道与流

管道就是一系列聚合操作。下面的代码作用是打印所有过滤所有男性成员的名字并打印,其中用到了filter和forEach操作组成的管道:

roster
.stream()
.filter(e -> e.getGender() == Person.Sex.MALE)
.forEach(e -> System.out.println(e.getName()));

跟使用for-each循环的代码进行对比:

for (Person p : roster) {
if (p.getGender() == Person.Sex.MALE) {
System.out.println(p.getName());
}
}

管道包含以下组件:

  • 一个元素源:操作元素的来源,可以是集合、数组、 I/O channel。
  • 若干中间操作。 中间操作会生成新的流。流就是一个元素序列,跟集合的区别在于,流并不是存储元素的数据结构。每一步中间操作产生的新的元素流正好可以供下一步操作使用。
  • 一个终端操作。像forEach这种终端操作,不会生成流,而会生成一个基本类型数据、一个集合、或者什么都不生成。本例中的foreach接收一个lambda表达式,表达式做的操作只是打印Person的名字。

再看一个例子,计算所有男性成员的平均年龄:

double average = roster
.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();

mapToInt操作生成了新的流:IntStream,这是通过接收一个lambda表达式(e -> e.getAge())或方法引用(Person::getAge)来完成一个流到另一个流的转换。 average操作计算上一步中IntStream的元素的平均值,并返回一个OptionalDouble对象,最终调用getAsDouble获取一个double。

聚合操作与迭代器的区别

像forEach这种聚合操作,很像迭代器Iterator,但是他们之间还是有几点不同:

  • 聚合操作不提供next方法,对元素的迭代对调用者不可见,调用者只能决定对哪个集合进行迭代,但不能决定如何迭代。而迭代器Iterator对元素的迭代是暴露给调用者的,调用者可以决定对哪个集合进行迭代,也可以决定如何进行迭代。外部迭代只能串行执行,而内部迭代没有这种限制,可以并行执行提高效率。
  • 聚合操作的元素是从流(stream)获取的,迭代器操作的元素从集合获取。
  • 聚合操作可以接收lambda表达式,来指定如何操作元素。

二、化简操作

上个例子中的average()操作也叫化简操作,就是 JDK中还包含了 average, sum, min, max, count等一系列化简操作,就是把元素流通过某种规则转化成一个结果值, 比如计算平均值、最大值、最小值、和值、个数统计,这样的操作叫化简操作。除了上面提到的几种完成特定功能的操作,JDK还提供了通用的化简操作Stream.reduce和Stream.collect方法。

Stream.reduce

以计算男性成员的年龄和为例,完成这个功能即可以使用Stream.sum方法:

Integer totalAge = roster
.stream()
.mapToInt(Person::getAge)
.sum();

也可以使用Stream.reduce方法:

Integer totalAgeReduce = roster
.stream()
.map(Person::getAge)
.reduce(
0,
(a, b) -> a + b);

reduce方法接收两个参数,含义是:

  • 操作元素: 方法的初始值、默认返回值
  • 累加器: 表达式“(a, b) -> a + b”中a代表中间值,b代表下一个操作数,“a+b”作为新的中间值进行下一步计算

reduce方法在操作每一个元素时,总是返回一个新值。某些复杂场景下,这样做效率很低下。

Stream.collect

与reduce操作总是返回新值不同,collect操作是在原值基础上进行修改。 下面考虑计算平均值这个场景,需要两个数据:总元素个数、元素和值。

class Averager implements IntConsumer
{ private int total = 0;
private int count = 0;

public double average() {
    return count > 0 ? ((double) total)/count : 0;
}

public void accept(int i) { total += i; count++; }
public void combine(Averager other) {
    total += other.total;
    count += other.count;
}

}

使用方法如下:

Averager averageCollect = roster.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.map(Person::getAge)
.collect(Averager::new, Averager::accept, Averager::combine);

System.out.println(“Average age of male members: “ +
averageCollect.average());

collect方法接收三个参数:

  • supplier: supplier就是工厂方法,用来生成一个容器类,保存collect方法的返回值,本例中是一个Averager实例.
  • accumulator: accumulator方法把stream中的元素处理后放入supplier生成的结果容器。
  • combiner: combiner方法是用来把两个结果容器合并成一个结果返回。

尽管JDK提供了average聚合操作来计算stream中元素的平均值,但自定义的collect方法更灵活,适用于更复杂的计算。 再看一个例子:

List namesOfMaleMembersCollect = roster
.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.map(p -> p.getName())
.collect(Collectors.toList());

这次只有一个参数,Collectors这个类封装了很多实用操作,就不需要传入上面说的三个参数。 下面看一个按性别分组的例子:

Map<Person.Sex, List> byGender =
roster
.stream()
.collect(
Collectors.groupingBy(Person::getGender));

再看一个按性别分组,但是只返回姓名的例子,这个有点复杂:

Map<Person.Sex, List> namesByGender =
roster
.stream()
.collect(
Collectors.groupingBy(
Person::getGender,
Collectors.mapping(
Person::getName,
Collectors.toList())));

按性别分类计算总年龄:

Map<Person.Sex, Integer> totalAgeByGender =
roster
.stream()
.collect(
Collectors.groupingBy(
Person::getGender,
Collectors.reducing(
0,
Person::getAge,
Integer::sum)));

按性别分类计算平均年龄:

Map<Person.Sex, Double> averageAgeByGender = roster
.stream()
.collect(
Collectors.groupingBy(
Person::getGender,
Collectors.averagingInt(Person::getAge)));

上面提到的这些还是要实际使用几次,熟练了就不晕了。

并行计算

并行计算就是把父问题分解为子问题,多线程同时解决,然后合并子问题的结果。尽管JDK提供了并行计算fork/join框架, 但是还是需要手动指定如何分解问题。比起并行聚合操作来说,还是太麻烦。 自己实现并行计算的难点在于,要考虑线程安全的集合操作,不如使用JDK现成的解决方案方便。并行计算能不能提高程序运行效率取决于数据规模和CPU数量,具体场景具体分析。

并行处理Streams

Collection.parallelStream或者BaseStream.parallel可以创建并行stream。再看一个例子,并行计算男性平均年龄:

double average = roster
.parallelStream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();

并发Reduction

串行按年龄分组

Map<Person.Sex, List> byGender =
roster
.stream()
.collect(
Collectors.groupingBy(Person::getGender));

并行按年龄分组

ConcurrentMap<Person.Sex, List> byGender =
roster
.parallelStream()
.collect(
Collectors.groupingByConcurrent(Person::getGender));

顺序

下面讨论管道处理流中元素的顺序。先看一个打印元素的例子:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List listOfIntegers =
new ArrayList<>(Arrays.asList(intArray));

System.out.println(“listOfIntegers:”);
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + “ “));
System.out.println(“”);

System.out.println(“listOfIntegers sorted in reverse order:”);
Comparator normal = Integer::compare;
Comparator reversed = normal.reversed();
Collections.sort(listOfIntegers, reversed);
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + “ “));
System.out.println(“”);

System.out.println(“Parallel stream”);
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + “ “));
System.out.println(“”);

System.out.println(“Another parallel stream:”);
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + “ “));
System.out.println(“”);

System.out.println(“With forEachOrdered:”);
listOfIntegers
.parallelStream()
.forEachOrdered(e -> System.out.print(e + “ “));
System.out.println(“”);

输出如下:

listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1

解释一下上面的输出:

  • 第一个就是按元素的初始化顺序进行遍历输出,没什么好说的。
  • 第二个是使用了排序方法Collections.sort排序后,遍历输出。这次是有顺序的。
  • 第三、第四个是使用并行流输出,两次打印元素的顺序完全随机,是运行时jvm随机决定的,以便能最大化并行的优势。
  • 第五个是调用了forEachOrdered方法,这次输出就有顺序了,但是运行效率可能不如并行流。

副作用

干扰

当管道在处理stream中的元素时,如果stream源发生了变化,就会报错。下面这个例子,尝试连接所有listOfStrings集合中的string,但是运行这段代码,会抛出ConcurrentModificationException:

try {
List listOfStrings =
new ArrayList<>(Arrays.asList(“one”, “two”));

// This will fail as the peek operation will attempt to add the
// string "three" to the source after the terminal operation has
// commenced. 

String concatenatedString = listOfStrings
    .stream()

    // Don't do this! Interference occurs here.
    .peek(s -> listOfStrings.add("three"))

    .reduce((a, b) -> a + " " + b)
    .get();

System.out.println("Concatenated string: " + concatenatedString);

} catch (Exception e) {
System.out.println(“Exception caught: “ + e.toString());
}

有状态的Lambda表达式

Avoid using stateful lambda expressions as parameters in stream operations. 有状态的lambda表达式就是最终返回值会被管道中的操作所影响。下面这个例子,是把listOfIntegers中的元素通过map方法复制到另一个list中,分别使用串行流与并行流

List serialStorage = new ArrayList<>();

System.out.println(“Serial stream:”);
listOfIntegers
.stream()

// Don't do this! It uses a stateful lambda expression.
.map(e -> { serialStorage.add(e); return e; })

.forEachOrdered(e -> System.out.print(e + " "));

System.out.println(“”);

serialStorage
.stream()
.forEachOrdered(e -> System.out.print(e + “ “));
System.out.println(“”);

System.out.println(“Parallel stream:”);
List parallelStorage = Collections.synchronizedList(
new ArrayList<>());
listOfIntegers
.parallelStream()

// Don't do this! It uses a stateful lambda expression.
.map(e -> { parallelStorage.add(e); return e; })

.forEachOrdered(e -> System.out.print(e + " "));

System.out.println(“”);

parallelStorage
.stream()
.forEachOrdered(e -> System.out.print(e + “ “));
System.out.println(“”);

“e -> { parallelStorage.add(e); return e; }”就是一个有状态的lambda表达式。输出结果是随机的。

Serial stream:
8 7 6 5 4 3 2 1
8 7 6 5 4 3 2 1
Parallel stream:
8 7 6 5 4 3 2 1
1 3 6 2 4 5 8 7