Java 8 你需要掌握的新特性 4、并发增强
4.1 概述
这部分用的不多,仅简单记录一下新特性
4.2 原子值
4.2.1 更新方法
Java 8 增加了两个新方法 updateAndGet 和 accumulateAndGet 用于更新原子类的值,取代老API的循环方式。
AtomicInteger ai = new AtomicInteger();
// 多线程环境下安全更新
Integer i = ai.updateAndGet(x -> 5);
System.out.println(i); // 输出 5
// 多线程环境下安全更新 将原子值和传入的参数组合
ai.accumulateAndGet(1, (oldValue, paramValue) -> oldValue + paramValue);
System.out.println(ai.get()); // 输出 6
4.2.2 LongAdder
如果程序内有高度的竞争,大量的线程访问同一个原子值,可以使用 LongAdder 和 LongAccumulator,这个类是 Java 8 提供用于在高度竞争环境下替代 AtomicLong 的。
LongAdder adder = new LongAdder();
adder.add(2);
System.out.println(adder.intValue()); // 输出 2
adder.increment();
System.out.println(adder.sum()); // 输出 3 sum方法返回long型
// 同 accumulateAndGet 方法, 将原子值和传入的参数组合
LongAccumulator la = new LongAccumulator((left, right) -> left + right, 10);
System.out.println(la.intValue()); // 输出 10
4.3 ConcurrentHashMap
额外提一点,在 1.5 版本, ConcurrentHashMap 存在死锁的可能(具体源码就不再分析了),1.6 版本以后修复了这个问题。
4.3.1 更新值
一个误区eg: 对关键字计数
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Integer oldValue = map.get("key");
Integer newValue = oldValue == null ? 1 : oldValue + 1;
map.put("key", newValue);
这段代码并不是线程安全的,有可能发生多个线程更新的值相等。因为 ConcurrentHashMap 的设计目的是保护内部结构不被破坏,任何 get 、put 操作不会导致内部链表节点丢失或形成回路,而不是维护操作顺序的原子性。
正确的更新方法,CAS方式循环更新:
do {
oldValue = map.get("key");
newValue = oldValue == null ? 1 : oldValue + 1;
} while (!map.replace("key", oldValue, newValue));
Java 8 中也可以这样
ConcurrentHashMap<String, LongAdder> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", new LongAdder()).increment();
4.3.2 批量数据操作
Java 8 为 ConcurrentHashMap 提供了批量数据操作, 即使其它线程同时操作时也可以安全的执行。 批量数据操作有三类:
- search:对所有的键和(或)值应用一个函数,直到函数返回一个非 null 的结果。
- reduce:通过提供的聚合函数,将所有的键和(或)值组合起来。
- forEach:对所有的键和(或)值应用一个函数,注意这个与 Map 中的 forEach 方法不一样。
每类操作都有对键、值、键和值、Map.Entry对象操作的4个版本,列举一下。
- searchKeys / reduceKeys / forEachKey
- searchValues / reduceValues / forEachValues
- search / reduce / forEach
- searchEntries / reduceEntries / forEachEntry
在使用这几个操作的时候,需要指定一个 并行阀值。如果 map.size 大于阀值,批量操作就以并行的方式执行。如果不想并行执行,可以使用 Long.MAX_VALUE作为阀值;如果想尽可能的多线程执行,可以用1作为阀值。
search
很简单,看 一眼 API 就能明白,以一个方法api为例:
U search(long threshold, BiFunction<? super K, ? super V, ? extends U> f)
forEach
forEach 方法有两种形式,第一种就是我们常用的,参数为消费者,只不过这里参数多了一个阀值,
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.forEach(3, (k, v) -> System.out.println(k +"-" + v));
第二中又多了一个参数,是一个转换其函数,转换后的结果传递给消费者函数,如果转换后的结果为 null,值会被跳过。
map.forEach(3,
(k, v) -> k +"-"+v, // 转换器
System.out::print); // 消费者
reduce
用法见前面章节介绍,这里也是多了一个阀值参数。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold, (s, s2) -> s + s2);
同 forEach 一样,你也可以提供一个转换器函数。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold,
v -> v == null ? null: v + " ", // 转换器
(s, s2) -> s + s2); // 聚合方法
4.4 并行数组
4.4.1 sort
可能是因为设计者想更明确的表达并行排序,所以并没有修改原 sort方法,而是新增加了 parallelSort 和 parallel* 等方法。最简单的并行排序如下,跟 sort 没什么区别。
String[] arr = {"an", "ba", "daf", "ads", "ca"};
Arrays.parallelSort(arr);
4.4.2 parallelSetAll
这个比较有意思,它的参数是一个位置索引,所以能构造跟位置有关的数组值,
String[] arr = {"an", "ba", "daf", "ads", "ca"};
Arrays.parallelSetAll(arr, i -> arr[i] + i);
for (String s : arr) {
System.out.println(s);
}
// 输出结果是:
an0
ba1
daf2
ads3
ca4
4.4.3 parallelPrefix
类似reduce, 对数组内元素进行聚合
String[] arr = {"an", "ba", "daf", "ads", "ca"};
Arrays.parallelPrefix(arr, (x, y) -> x+"+"+y);
for (String s : arr) {
System.out.println(s);
}
// 输出结果
an
an+ba
an+ba+daf
an+ba+daf+ads
an+ba+daf+ads+ca
4.5 CompletableFuture
相对于老的 Future 增加了类似 Stream API 流水操作的功能,使同一个线程执行的代码能写在一起,虽然他们不是连续执行的。
CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");
System.out.println("111");
// 同一线程执行的代码写在一起
CompletableFuture<String> c2 = contents.thenApplyAsync(x -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("333");
return x;
});
c2.thenAcceptAsync(x -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(x);
});
// 其他的业务在 Future 执行之前执行
System.out.println("222");
// 防止测试程序结束
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 输出顺序是:
111
222
333
abcdef
上面的例子为了顺序清晰,加了一些恶心的代码,精简一下应该是这样
CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");
// 同一线程执行的代码写在一起
contents.thenApplyAsync(x -> x).thenAcceptAsync(System.out::println);
// 其他的业务在 Future 执行之前执行
System.out.println("222");
CompletableFuture 流式操作还有几个实用的方法,就不写demo了,需要用的时候再查API。