看了前两篇你肯定已经理解了 java 并发编程的底层构建。然而,在实际编程中,应该经可能的远离底层结构,毕竟太底层的东西用起来是比较容易出错的,特别是并发编程,既难以调试,也难以发现问题,我们还是使用由并发处理的专业人员实现的较高层次的结构要方便、安全得多。

阻塞队列

  对于许多线程问题,都可以使用一个或多个队列来安全、优雅的进行数据的传递。比如经典的生产者–消费者问题,生产者不停的生成某些数据,消费者需要处理数据,在多线程环境中,如何安全的将数据从生产者线程传递到消费者线程?

  无需使用锁和条件对象,java 自带的阻塞队列就能够完美的解决这个问题。阻塞队列中所有方法都是线程安全的,所以我们进行读取、写入操作时无需考虑并发问题。阻塞队列主要有以下几种方法:

方法 正常结果 异常结果
add 添加一个元素 队列满,抛出 IllegalStateException 异常
element 返回队列头元素 队列空,抛出 NoSuckElementException 异常
offer 添加一个元素,返回 true 队列满,返回 false
peek 返回队列的头元素 队列空,返回 null
poll 移出并返回队列头元素 队列空,返回 null
put 添加一个元素 队列满,阻塞
remove 移出并返回头元素 队列空,抛出 NoSuckElementException 异常
take 移出并返回头元素 队列空,则阻塞

上面的方法主要分成了三类,第一类:异常情况下抛出异常;第二类:异常情况返回 false/null;第三类:异常情况下阻塞。可以根据自身情况选择合适的方法来操作队列。

阻塞队列的实现

  在 java.util.concurrent 包中,提供了阻塞队列的几种实现,当前也可以自己实现 BlockingQueue 接口,实现自己的阻塞队列。

  • LinkdedBlockingQueue:链式阻塞队列。一般情况下链式的结构容量都是没有上限的,但是也可以选择手动指定最大容量。
  • LinkdedBlockingDeque:链式阻塞双端队列。
  • PriorityBlockingQueue:优先级队列。按照优先级移出,无容量上限。
  • ArrayBlockingQueue:数组队列,需指定容量。可选指定是否需要公平性,如果设置了公平性,等待了最长时间的线程会优先得到处理,但是会降低性能。

延迟队列

  DelayQueue 也是阻塞队列的一种,不过它要求队列中的元素实现Delayed接口。需要重新两个方法:

  • long getDelay(TimeUnit unit)返回延迟的时间,负值表示延迟结束,只有延迟结束的情况下,元素才能从队列中移出。
  • int compareTo(Delayed o)比较方法,DelayQueue 使用该方法对元素进行排序。

传递队列

  在 Java SE 7 中新增了一个 TransferQueue 接口,允许生产者等待,直到消费者消费了某个元素。原本生产者消费者是没有关系的,生产者并不知道某个元素是否被消费者消费了。通过此接口可以让生产者知道某个元素确实被消费了。如果生产者调用:

q.transer(item)

方法,这个调用会阻塞,知道 item 被消费线程取出消费。LinkedTransferQueue 实现了此接口。

线程安全的集合

  如果多个线程并发的操作集合,会很容易出现问题,我们可以选择锁来保护共享数据,但是更好的选择是使用线程安全的集合来作为替代。本节介绍 Java 类库中提供的线程安全的集合(上一节介绍的阻塞队列也在其中)。

  这类集合,size 是通过便利得出的,较慢。而且如果 size 数量大于 20 亿,有可能超过 int 的范围,使用 size 方法无法获取到大小,在 java8 中引入了 mappingCount 方法,返回值类型为 long。

映射 map

  映射是日常使用中非常常见的一种数据结构。共有以下几种线程安全的映射:

  • ConcurrentSkipListMap:有序映射,根据键排序
  • ConcurrentHashMap:无序映射

映射条目的原子更新

  一旦涉及到多线程环境,做啥都比较麻烦,比如更新一个 map 中某个键值对的值,下面的操作显然是不正确的:

int old = map.get(key);
map.put(key,old+1);

假如有两个线程同时操作一个 key,虽然 put 方法是线程安全的,但是由于两个线程之前读取的 old 是一样的,这样就会导致某个线程的修改被覆盖掉。

  有以下几种安全的更新方法:

  1. 使用 repalce(key,oldValue,newValue)方法,此方法会在 key,oldValue 完全匹配时将 oldValue 换为 newValue 返回 true,否则返回 false。
  2. 使用 AtomicLong 或者 LongAdder 作为映射的值,这两个的操作方法是原子性的,因此可以安全的修改值。 3.使用 compute 类似方法完成更新。比如下面的:
# 如果key不再map中,v的值为null
map.compute(key,(k,v)->v==null?1:v+1);

# 如果不存在key
map.computeIfAbsent(key,key->new LongAdder())

# 如果存在key
map.computeIfPresent(key,key->key+1)

# compute方法类似,不过不处理键
map.merge(key,value,(existingValue,newValue)->existingValue+newValue+1)

批操作

  java8 引入的,即使有其他线程在处理映射,批操作也能安全的执行。批操作会遍历映射,处理便利过程中找到的元素,且无需冻结当前映射的快照。显然通过批操作获取的结果不是完全精确的,因为遍历过程中,元素可能会被改变。

  有以下三种不同的操作:

  • 搜索(search),遍历结果直到返回一个非 null 的结果
  • 归约(reduce),组合所有键或值,需提供累加函数
  • forEach,遍历所有的键值对 每个操作都有 4 个版本:
  • operationKeys:处理键
  • operationValues:处理值
  • operation:处理键值
  • operationEntries:处理需要 map.Entry 对象

并发集合

  线程安全的 set 集合只有以下一种:

  • ConcurrentSkipListSet:有序 set 如果我们想要一个 hash 结构的,线程安全的 set,有以下几种办法.
  1. 通过 ConcurrentHashMap.<Key>newKeySet()生成一个 Set,比如:
Set<String> sets = ConcurrentHashMap.<String>newKeySet();

这其实只是 ConcurrentHashMap<Key,Boolean>的一个包装器,所有的值都为 true

  1. 通过现有映射对象的 keySet 方法,生成这个映射的键集。如果删除这个集的某个元素,映射上对于元素也会被删除。但是不能添加元素,因为没有相应的值。java8 新增了一个 keySet 方法,可以设置一个默认值,这样就能为向集合中增加元素。

数组

  在 Concurrent 包中只有一个CopyOnWriteArrayList数组。该数组所有的修改都会对底层数组进行复制,也就是每插入一个元素都会将原来的数组复制一份并加入新的元素。

  当构建一个迭代器时,迭代器指向的是当前数组的引用,如果后来数组被修改了,迭代器指向的任然是旧的数组。

任何集合类都可以通过使用同步包装器变成线程安全的,如下:

//线程安全的列表
List<String> list1 = Collections.synchronizedList(new ArrayList<>());
//线程安全的map
Map<String,String> map1 = Collections.synchronizedMap(new HashMap<>());
//线程安全的set
Set<String> set1 = Collections.synchronizedSet(new HashSet<>());

并行数组算法

  在 java 8 中,Arrays 类提供了大量的并行化操作。

  1. Arrays.parallelSort

  对一个基本数据类型或对象的数组进行排序

  1. Arrays.paralletSetAll

  用一个函数计算得到的值填充一个数组。这个函数接收元素索引,然后计算值。例如:

# 将所有值加上对于的序号
Arrays.parallelSetAll(arr,i->i+ arr[i]);
  1. parallelPrefix

  用对应一个给定结合操作的前缀的累加结果替换各个数组元素。看文字描述不太容易看懂,这里用一个例子说明:

int[] arr = {1,2,3,4}
Arrays.parallelPrefix(arr,(x,y)->x*y);
// arr变成:[1,1*2,1*2*3,1*2*3*4]