并发集合类
普通集合(ArrayList、HashMap)不是线程安全的。强行在多线程中使用会导致数据不一致、死循环(JDK 7 HashMap 扩容)甚至程序崩溃。JDK 提供了多种并发集合来解决这些问题。
为什么不用 Collections.synchronizedXxx
// 简单但粗暴——所有方法加 synchronized
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
问题:所有操作都用同一把锁,读和读之间也互斥,并发性能极差。且复合操作(先检查再修改)仍然不是原子的:
// 不安全!check-then-act 不是原子操作
if (!map.containsKey(key)) { // 检查
map.put(key, value); // 操作——两步之间可能有其他线程插入
}
ConcurrentHashMap
JDK 7:Segment 分段锁
ConcurrentHashMap (JDK 7)
│
├── Segment[0] (继承 ReentrantLock)
│ └── HashEntry[] → 链表
│
├── Segment[1]
│ └── HashEntry[] → 链表
│
├── ...
│
└── Segment[15] (默认 16 个 Segment)
└── HashEntry[] → 链表
- 将数据分为 16 个 Segment,每个 Segment 有独立的锁
- 不同 Segment 的操作可以并行——理论上最多 16 个线程同时写
- 读操作不加锁(
volatile保证可见性) - 缺点:锁的粒度仍然较粗,Segment 数量固定
JDK 8+:CAS + synchronized
JDK 8 彻底重构了 ConcurrentHashMap,抛弃分段锁,改为对单个桶(Node)加锁:
ConcurrentHashMap (JDK 8+)
│
Node[] table
├── [0] null
├── [1] Node → Node → Node (链表)
├── [2] TreeBin → TreeNode (红黑树)
├── [3] null
├── ...
└── [N] Node
↑
对单个桶头节点加 synchronized
put 流程
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. key 和 value 都不允许为 null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 2. 懒初始化(CAS)
else if ((f = tabAt(tab, i = (n-1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<>(hash, key, value)))
break; // 3. 桶为空 → CAS 放入(不加锁)
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 4. 正在扩容,帮助迁移
else {
synchronized (f) { // 5. 桶不为空 → 锁住头节点
if (tabAt(tab, i) == f) { // double-check
if (fh >= 0) {
// 链表:遍历,找到相同 key 则更新,否则尾插
} else if (f instanceof TreeBin) {
// 红黑树:树插入
}
}
}
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 6. 链表→红黑树
}
}
addCount(1L, binCount); // 7. 计数 + 考虑扩容
return null;
}
核心要点:
- 空桶用 CAS,非空桶用 synchronized
- 锁的粒度是单个桶的头节点,不同桶完全并行
- 扩容时多线程协作迁移(helpTransfer)
扩容:多线程协作
JDK 8 的扩容不是一个线程独自完成的。当一个线程发现需要扩容时:
- 创建新的
nextTable(容量翻倍) - 将旧表按照步长(stride)划分为多个区间
- 每个线程负责迁移一个区间的桶
- 迁移完的桶放一个
ForwardingNode(hash = MOVED),告诉其他线程"这个桶迁移完了" - 其他线程如果 put 到正在迁移的桶,也会参与帮忙
size() 的实现
JDK 8 的 ConcurrentHashMap 使用 baseCount + CounterCell[] 来统计大小(类似 LongAdder 的思想),在高并发下比 AtomicLong 性能更好。
为什么 key 和 value 不能为 null?
HashMap 允许 null key 和 null value,但 ConcurrentHashMap 不允许。原因是二义性:
map.get(key); // 返回 null
// 是 key 不存在?还是 value 就是 null?
// HashMap 中可以用 containsKey 判断,但 ConcurrentHashMap 中
// get 和 containsKey 不是原子的——两步之间可能有其他线程修改
CopyOnWriteArrayList
写时复制:每次修改时复制整个底层数组,适合读多写少的场景。
// 写操作
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); // 复制
newElements[len] = e;
setArray(newElements); // 替换引用
return true;
} finally {
lock.unlock();
}
}
// 读操作——不加锁
public E get(int index) {
return get(getArray(), index); // 直接读,没有同步
}
特点:
- ✅ 读操作无锁,性能极高
- ✅ 迭代器不会抛
ConcurrentModificationException - ❌ 写操作需要复制整个数组,内存开销大
- ❌ 数据弱一致性——读到的可能是旧数据
适用场景:事件监听器列表、配置缓存等读多写少的场景。
CopyOnWriteArraySet 是基于 CopyOnWriteArrayList 实现的 Set(contains 用线性查找)。
BlockingQueue(阻塞队列)
阻塞队列是生产者-消费者模式的基础设施。当队列满时,put() 会阻塞;当队列空时,take() 会阻塞。
操作方式对比
| 抛异常 | 返回特殊值 | 阻塞等待 | 超时 | |
|---|---|---|---|---|
| 插入 | add(e) |
offer(e) |
put(e) |
offer(e, timeout) |
| 移除 | remove() |
poll() |
take() |
poll(timeout) |
| 查看 | element() |
peek() |
- | - |
常见实现
| 实现 | 特点 |
|---|---|
ArrayBlockingQueue |
基于数组,有界,一把锁 |
LinkedBlockingQueue |
基于链表,可选有界(默认无界),两把锁(头尾分离) |
PriorityBlockingQueue |
无界优先级队列 |
DelayQueue |
延迟队列,元素到期才能取出 |
SynchronousQueue |
容量为 0,put 必须等 take |
LinkedTransferQueue |
融合了 SynchronousQueue 和 LinkedBlockingQueue |
ArrayBlockingQueue vs LinkedBlockingQueue
| 特性 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 底层结构 | 数组 | 链表 |
| 是否有界 | 必须指定容量 | 默认 Integer.MAX_VALUE |
| 锁 | 一把锁(put/take 互斥) | 两把锁(putLock + takeLock) |
| 内存 | 预分配,GC 友好 | 每次 put 创建 Node |
| 吞吐量 | 中等 | 更高(双锁) |
SynchronousQueue
没有容量的队列——put() 后线程阻塞,直到另一个线程 take()。实质上是线程间直接传递数据。
CachedThreadPool 使用 SynchronousQueue:每提交一个任务,要么立即有线程接手,要么创建新线程。
DelayQueue
元素必须实现 Delayed 接口,只有到期后才能被 take():
public class DelayedTask implements Delayed {
private final long executeTime; // 执行时间戳
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
}
DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask(System.currentTimeMillis() + 5000)); // 5秒后过期
DelayedTask task = queue.take(); // 阻塞直到有元素过期
应用场景:订单超时关闭、缓存过期清理、延迟消息。
ConcurrentLinkedQueue
基于 CAS 的无锁并发队列,适合高并发场景。没有阻塞语义——poll() 返回 null 表示空队列。
与 BlockingQueue 的区别:不支持阻塞操作(put/take),需要自己处理空队列的情况。
ConcurrentSkipListMap
基于跳表的并发有序 Map,支持 O(log n) 的查找、插入和删除。
跳表结构:
Level 3: 1 ────────────────────── 9
Level 2: 1 ────── 4 ────────── 9
Level 1: 1 ── 3 ── 4 ── 6 ── 9
Level 0: 1 2 3 4 5 6 7 8 9
相当于并发版的 TreeMap。当需要有序的并发 Map 时使用。
选型指南
| 需求 | 推荐 |
|---|---|
| 高并发 KV 存储 | ConcurrentHashMap |
| 有序的并发 Map | ConcurrentSkipListMap |
| 读多写少的 List | CopyOnWriteArrayList |
| 生产者-消费者 | ArrayBlockingQueue / LinkedBlockingQueue |
| 线程间直接传递 | SynchronousQueue |
| 延迟任务 | DelayQueue |
| 高并发无锁队列 | ConcurrentLinkedQueue |
生产环境核心踩坑点
| 问题 | 答案要点 |
|---|---|
| ConcurrentHashMap JDK 7 vs 8 的实现? | 7: Segment分段锁; 8: CAS+synchronized锁单个桶 |
| 为什么 ConcurrentHashMap 不允许 null? | 消除 get 返回 null 的二义性 |
| CopyOnWriteArrayList 原理? | 写时复制数组,读无锁 |
| ArrayBlockingQueue 和 LinkedBlockingQueue 的区别? | 数组/链表; 一把锁/两把锁; 必须有界/默认无界 |
| SynchronousQueue 是什么? | 无容量队列,put 阻塞直到 take |