并发容器

并发容器

Vector

在 JDK1.5 之前,如果想要使用并发安全的 List 只能选择 Vector。而 Vector 是一种老旧的集合,已经被淘汰。**Vector 对于增删改查等方法基本都加了 synchronized**,这种方式虽然能够保证同步,但这相当于对整个 Vector 加上了一把大锁,使得每个方法执行的时候都要去获得锁,导致性能非常低下。

CopyOnWriteArrayList

JDK1.5 引入了 Java.util.concurrent(JUC)包,其中提供了很多线程安全且并发性能良好的容器,其中唯一的线程安全 List 实现就是 CopyOnWriteArrayList

**核心思想Copy-On-Write **

CopyOnWriteArrayList名字中的“Copy-On-Write”即写时复制,简称 COW。

下面是维基百科对 Copy-On-Write 的介绍,介绍的挺不错:

写入时复制(英语:Copy-on-write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。

这里再以 CopyOnWriteArrayList为例介绍:当需要修改( addsetremove 等操作) CopyOnWriteArrayList 的内容时,不会直接修改原数组,而是会先创建底层数组的副本,对副本数组进行修改,修改完之后再将修改后的数组赋值回去,这样就可以保证写操作不会影响读操作了。

可以看出,写时复制机制非常适合读多写少的并发场景,能够极大地提高系统的并发性能。

不过,写时复制机制并不是银弹,其依然存在一些缺点,下面列举几点:

  1. 内存占用:每次写操作都需要复制一份原始数据,会占用额外的内存空间,在数据量比较大的情况下,可能会导致内存资源不足。
  2. 写操作开销:每一次写操作都需要复制一份原始数据,然后再进行修改和替换,所以写操作的开销相对较大,在写入比较频繁的场景下,性能可能会受到影响。
  3. 数据一致性问题:修改操作不会立即反映到最终结果中,还需要等待复制完成,这可能会导致一定的数据一致性问题。

ConcurrentLinkedQueue

Java 提供的线程安全的 Queue 可以分为阻塞队列和非阻塞队列,非阻塞队列的典型例子是 ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。 阻塞队列可以通过加锁来实现,非阻塞队列可以通过 CAS 操作实现。

从名字可以看出,ConcurrentLinkedQueue这个队列使用链表作为其数据结构.ConcurrentLinkedQueue 应该算是在高并发环境中性能最好的队列了。它之所有能有很好的性能,是因为其内部复杂的实现。

ConcurrentLinkedQueue 主要使用 CAS 非阻塞算法来实现线程安全就好了。

ConcurrentLinkedQueue 适合在对性能要求相对较高,同时对队列的读写存在多个线程同时进行的场景,即如果对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替代。

阻塞队列BlockingQueue

上面我们己经提到了 ConcurrentLinkedQueue为高性能的非阻塞队列。下面我们要讲到的是阻塞队列——BlockingQueue。阻塞队列(BlockingQueue)被广泛使用在“生产者-消费者”问题中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。下面是 BlockingQueue 的相关实现类:

ArrayBlockingQueue

  • ArrayBlockingQueue BlockingQueue 接口的有界队列实现类,底层采用数组来实现。
  • 队列满时插入操作被阻塞,队列空时,移除操作被阻塞。
  • 按照先进先出(FIFO)原则对元素进行排序。
  • 默认不保证线程公平的访问队列,因为采用的是ReentrantLock来实现并发控制。
  • 公平访问队列:按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。
  • 非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格。有可能先阻塞的线程最后才访问访问队列。
  • 公平性会降低吞吐量。
1
2
3
public class ArrayBlockingQueue<E>
extends AbstractQueue<E>
implements BlockingQueue<E>, Serializable{}

ArrayBlockingQueue 一旦创建,容量不能改变。其并发控制采用可重入锁 ReentrantLock ,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。

ArrayBlockingQueue 默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到 ArrayBlockingQueue。而非公平性则是指访问 ArrayBlockingQueue 的顺序不是遵守严格的时间顺序,有可能存在,当 ArrayBlockingQueue 可以被访问时,长时间阻塞的线程依然无法访问到 ArrayBlockingQueue。如果保证公平性,通常会降低吞吐量。

如果需要获得公平性的 ArrayBlockingQueue,可采用如下代码:

1
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);

LinkedBlockingQueue

LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。

通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE静态工厂方法 Executors.newFixedThreadPool() 使用了这个队列。(newFixedThreadPool 用于创建固定线程数)

相关构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
*某种意义上的无界队列
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
*有界队列
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

PriorityBlockingQueue

  • PriorityBlockQueue = PriorityQueue + BlockingQueue
  • 元素默认自然排序。
  • 可以自定义CompareTo()方法来指定元素排序规则。
  • 可以通过构造函数构造参数Comparator来对元素进行排序。

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则

PriorityBlockingQueue 并发控制采用的是可重入锁 ReentrantLock队列为无界队列(ArrayBlockingQueue 是有界队列,LinkedBlockingQueue 也可以通过在构造函数中传入 capacity 指定队列最大的容量,但是 PriorityBlockingQueue 只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容)。

简单地说,它就是 PriorityQueue 的线程安全版本。不可以插入 null 值,同时,插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。

SynchronousQueue

  • SynchronousQueue负责把生产者产生的数据传递给消费者线程。
  • SynchronousQueue本身不存储数据,调用了put方法后,队列里面也是空的。
  • 每一个put操作必须等待一个take操作完成,否则会阻塞
  • 适合传递性场景。
  • 性能高于ArrayBlockingQueue 和 LinkedBlockingQueue。

它是一个特殊的队列,它的名字其实就蕴含了它的特征,同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

我们比较少使用到 SynchronousQueue 这个类,不过它在线程池的实现类 ThreadPoolExecutor 中得到了应用

虽然上面我说了队列,但是 SynchronousQueue 的队列其实是虚的,其不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。

你不能在 SynchronousQueue 中使用 peek 方法(在这里这个方法直接返回 null),peek 方法的语义是只读取不移除,显然,这个方法的语义是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代,因为根本就没有元素可以拿来迭代的。虽然 SynchronousQueue 间接地实现了 Collection 接口,但是如果你将其当做 Collection 来用的话,那么集合是空的。当然,这个类也是不允许传递 null 值的(并发包中的容器类好像都不支持插入 null 值,因为 null 值往往用作其他用途,比如用于方法的返回值代表操作失败)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

ExecutorService executorService = Executors.newFixedThreadPool(2);

executorService.submit(() -> {
try {
// 生产者线程,等待消费者线程
queue.put(1);
System.out.println("Producer: Put 1");
} catch (InterruptedException e) {
e.printStackTrace();
}
});

executorService.submit(() -> {
try {
// 消费者线程,等待生产者线程
Integer take = queue.take();
System.out.println("Consumer: Take " + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

executorService.shutdown();
}
}

DelayQueue

DelayQueue 是 JUC 包(java.util.concurrent)为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列,是线程安全的。

核心特性

  • 延迟获取元素DelayQueue 存储实现了 Delayed 接口的对象,这些对象只能在其指定的延迟时间到期后才能从队列中取出。
  • 基于优先队列实现:内部使用 PriorityQueue 来维护元素的顺序,确保最先到期的任务总是优先被处理。
  • 线程安全:作为 BlockingQueue 的一种,DelayQueue 是线程安全的,可以在多线程环境中使用。
  • 无界队列DelayQueue 是无界的,这意味着它可以存储任意数量的元素,直到内存耗尽。

应用场景

  • 定时任务调度:可以用来调度将在未来某一时间点执行的任务,例如定时发送邮件、定期数据清理等。
  • 缓存过期管理:存储带有过期时间的缓存项,当项的过期时间到达时,自动从队列中移除,从而实现缓存项的自动过期。
  • 订单处理:如限时支付场景,可以将订单加入到 DelayQueue 中,并设定一个延迟时间,如果在该时间内没有完成支付,则从队列中取出并处理为过期订单。
  • 消息队列中的延时消息:在消息队列系统中,可以利用 DelayQueue 来实现消息的延时发送功能。

使用条件

  • 存放在 DelayQueue 中的元素必须实现 Delayed 接口,该接口继承自 Comparable 接口,因此需要实现 compareTogetDelay 方法。
  • getDelay 方法返回到激活日期的剩余时间,而 compareTo 方法用于比较延迟时间,决定元素在队列中的顺序。

DelayQueue 通过减少不必要的轮询和提高资源效率,为延迟任务提供了一种高效的处理方式。开发者可以轻松地将任务加入队列,并在任务到期时自动处理,从而避免了资源的浪费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

// 定义一个任务类,实现Delayed接口
class DelayedTask implements Delayed {
private String taskName;
private long executionTime; // 任务执行的时间

public DelayedTask(String taskName, long delaySeconds) {
this.taskName = taskName;
this.executionTime = System.currentTimeMillis() + delaySeconds * 1000;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = executionTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
DelayedTask other = (DelayedTask) o;
return Long.compare(this.executionTime, other.executionTime);
}

@Override
public String toString() {
return "DelayedTask{" +
"taskName='" + taskName + '\'' +
", executionTime=" + executionTime +
'}';
}

public void execute() {
System.out.println("Executing task: " + taskName);
}
}

public class DelayQueueExample {
public static void main(String[] args) {
DelayQueue<DelayedTask> queue = new DelayQueue<>();

// 添加几个任务到队列
queue.put(new DelayedTask("Task 1", 2));
queue.put(new DelayedTask("Task 2", 5));
queue.put(new DelayedTask("Task 3", 1));

// 处理任务
while (true) {
try {
// 取出并执行任务,如果队列为空,则等待
DelayedTask task = queue.take();
task.execute();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}

输出:

Executing task: Task 3
Executing task: Task 1
Executing task: Task 2

ConcurrentSkipListMap

下面这部分内容参考了极客时间专栏《数据结构与算法之美》以及《实战 Java 高并发程序设计》。

为了引出 ConcurrentSkipListMap,先带着大家简单理解一下跳表。

对于一个单链表,即使链表是有序的,如果我们想要在其中查找某个数据,也只能从头到尾遍历链表,这样效率自然就会很低,跳表就不一样了。跳表是一种可以用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找。但一个重要的区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整。而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。而对于跳表,你只需要部分锁即可。这样,在高并发环境下,你就可以拥有更好的性能。而就查询的性能而言,跳表的时间复杂度也是 O(logn) 所以在并发数据结构中,JDK 使用跳表来实现一个 Map。

跳表内的所有链表的元素都是排序的。查找时,可以从顶级链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说在查找过程中,搜索是跳跃式的。如上图所示,在跳表中查找元素 18。

image-20240406214644397

查找 18 的时候原来需要遍历 18 次,现在只需要 7 次即可。针对链表长度比较大的时候,构建索引查找效率的提升就会非常明显。

从上面很容易看出,跳表是一种利用空间换时间的算法。

使用跳表实现 Map 和使用哈希算法实现 Map 的另外一个不同之处是:哈希并不会保存元素的顺序,而跳表内所有的元素都是排序的。因此在对跳表进行遍历时,你会得到一个有序的结果。所以,如果你的应用需要有序性,那么跳表就是你不二的选择。JDK 中实现这一数据结构的类是 ConcurrentSkipListMap

ConcurrentSkipListMap 在实现时利用了跳表的特性,通过多层索引和随机化插入节点来实现并发访问下的高效性和线程安全性。它在多线程环境下可以同时进行读写操作而无需额外的同步措施,因为每个线程操作的区域都是独立的,不会相互影响。这使得 ConcurrentSkipListMap 成为一种非常适合高并发环境下使用的有序映射实现。


并发容器
http://example.com/2023/09/30/Java/Java多并发/并发容器/
作者
PALE13
发布于
2023年9月30日
许可协议