4.1 概述

这部分用的不多,仅简单记录一下新特性

4.2 原子值

4.2.1 更新方法

Java 8 增加了两个新方法 updateAndGet 和 accumulateAndGet 用于更新原子类的值,取代老API的循环方式。

AtomicInteger ai = new AtomicInteger();

// 多线程环境下安全更新
Integer i = ai.updateAndGet(x -> 5);
System.out.println(i);   // 输出 5

// 多线程环境下安全更新  将原子值和传入的参数组合
ai.accumulateAndGet(1, (oldValue, paramValue) -> oldValue + paramValue);
System.out.println(ai.get());   // 输出 6

4.2.2 LongAdder

如果程序内有高度的竞争,大量的线程访问同一个原子值,可以使用 LongAdder 和 LongAccumulator,这个类是 Java 8 提供用于在高度竞争环境下替代 AtomicLong 的。

LongAdder adder = new LongAdder();
adder.add(2);
System.out.println(adder.intValue());   // 输出 2

adder.increment();
System.out.println(adder.sum());  // 输出 3  sum方法返回long型

// 同 accumulateAndGet 方法, 将原子值和传入的参数组合
LongAccumulator la = new LongAccumulator((left, right) -> left + right, 10);
System.out.println(la.intValue());  // 输出 10

4.3 ConcurrentHashMap

额外提一点,在 1.5 版本, ConcurrentHashMap 存在死锁的可能(具体源码就不再分析了),1.6 版本以后修复了这个问题。

4.3.1 更新值

一个误区eg: 对关键字计数

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
Integer oldValue = map.get("key");
Integer newValue = oldValue == null ? 1 : oldValue + 1;
map.put("key", newValue);

这段代码并不是线程安全的,有可能发生多个线程更新的值相等。因为 ConcurrentHashMap 的设计目的是保护内部结构不被破坏,任何 get 、put 操作不会导致内部链表节点丢失或形成回路,而不是维护操作顺序的原子性。

正确的更新方法,CAS方式循环更新:

do {
    oldValue = map.get("key");
    newValue = oldValue == null ? 1 : oldValue + 1;
} while (!map.replace("key", oldValue, newValue));

Java 8 中也可以这样

ConcurrentHashMap<String, LongAdder> map = new ConcurrentHashMap<>();
map.putIfAbsent("key", new LongAdder()).increment();

4.3.2 批量数据操作

Java 8 为 ConcurrentHashMap 提供了批量数据操作, 即使其它线程同时操作时也可以安全的执行。 批量数据操作有三类:

  • search:对所有的键和(或)值应用一个函数,直到函数返回一个非 null 的结果。
  • reduce:通过提供的聚合函数,将所有的键和(或)值组合起来。
  • forEach:对所有的键和(或)值应用一个函数,注意这个与 Map 中的 forEach 方法不一样。

每类操作都有对键、值、键和值、Map.Entry对象操作的4个版本,列举一下。

  • searchKeys / reduceKeys / forEachKey
  • searchValues / reduceValues / forEachValues
  • search / reduce / forEach
  • searchEntries / reduceEntries / forEachEntry

在使用这几个操作的时候,需要指定一个 并行阀值。如果 map.size 大于阀值,批量操作就以并行的方式执行。如果不想并行执行,可以使用 Long.MAX_VALUE作为阀值;如果想尽可能的多线程执行,可以用1作为阀值。

很简单,看 一眼 API 就能明白,以一个方法api为例:

U search(long threshold, BiFunction<? super K, ? super V, ? extends U> f)

forEach

forEach 方法有两种形式,第一种就是我们常用的,参数为消费者,只不过这里参数多了一个阀值,

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.forEach(3, (k, v) -> System.out.println(k +"-" + v));

第二中又多了一个参数,是一个转换其函数,转换后的结果传递给消费者函数,如果转换后的结果为 null,值会被跳过。

map.forEach(3,
        (k, v) -> k +"-"+v,       // 转换器
        System.out::print);       // 消费者

reduce

用法见前面章节介绍,这里也是多了一个阀值参数。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold, (s, s2) -> s + s2);

同 forEach 一样,你也可以提供一个转换器函数。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
long threshold = 3;
map.reduceValues(threshold,
        v -> v == null ? null: v + " ",  // 转换器
        (s, s2) -> s + s2);              // 聚合方法

4.4 并行数组

4.4.1 sort

可能是因为设计者想更明确的表达并行排序,所以并没有修改原 sort方法,而是新增加了 parallelSort 和 parallel* 等方法。最简单的并行排序如下,跟 sort 没什么区别。

String[] arr = {"an", "ba", "daf", "ads", "ca"};
Arrays.parallelSort(arr);

4.4.2 parallelSetAll

这个比较有意思,它的参数是一个位置索引,所以能构造跟位置有关的数组值,

String[] arr = {"an", "ba", "daf", "ads", "ca"};

Arrays.parallelSetAll(arr, i -> arr[i] + i);

for (String s : arr) {
    System.out.println(s);
}

// 输出结果是:
an0
ba1
daf2
ads3
ca4

4.4.3 parallelPrefix

类似reduce, 对数组内元素进行聚合

String[] arr = {"an", "ba", "daf", "ads", "ca"};

Arrays.parallelPrefix(arr, (x, y) -> x+"+"+y);

for (String s : arr) {
    System.out.println(s);
}
// 输出结果
an
an+ba
an+ba+daf
an+ba+daf+ads
an+ba+daf+ads+ca

4.5 CompletableFuture

相对于老的 Future 增加了类似 Stream API 流水操作的功能,使同一个线程执行的代码能写在一起,虽然他们不是连续执行的。

CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");

System.out.println("111");

// 同一线程执行的代码写在一起
CompletableFuture<String> c2 = contents.thenApplyAsync(x -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println("333");
    return x;
});

c2.thenAcceptAsync(x -> {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    System.out.println(x);
});

// 其他的业务在 Future 执行之前执行
System.out.println("222");

// 防止测试程序结束
try {
    Thread.sleep(10000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

// 输出顺序是:
111
222
333
abcdef

上面的例子为了顺序清晰,加了一些恶心的代码,精简一下应该是这样

CompletableFuture<String> contents = CompletableFuture.completedFuture("abcdef");

// 同一线程执行的代码写在一起
contents.thenApplyAsync(x -> x).thenAcceptAsync(System.out::println);

// 其他的业务在 Future 执行之前执行
System.out.println("222");

CompletableFuture 流式操作还有几个实用的方法,就不写demo了,需要用的时候再查API。