线程安全Map
HashTable
线程安全的版本,所有的操作方法包括读操作都是synchronized
同步方法,因此同一时刻只有一个线程能操作,效率比较差
基本结构
实现了Map接口,同时继承Dictionary抽象类。Dictionary只包含抽象方法,没有逻辑实现,相当于一个早期的接口,功能被Map接口替代
Hashtable.java 1 2 3 public class Hashtable <K ,V > extends Dictionary <K ,V > implements Map <K ,V >, Cloneable , java .io .Serializable { }
底层核心是Entry数组,Entry是个单向链表
HashTable.java 1 2 3 4 5 private transient Entry<?,?>[] table;private transient int count; private int threshold; private float loadFactor; private transient int modCount = 0 ;
操作
哈希清空符号位,正数取余定位到数组位置,然后开始链表遍历查找
值得注意的是还需要再比较哈希值,因为定位数组时砍掉了符号位再取余数,同一个位置依然有多个哈希值
HashTable.java 1 2 3 4 5 6 7 8 9 10 11 public synchronized V get (Object key) { Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF ) % tab.length; for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) { if ((e.hash == hash) && e.key.equals(key)) { return (V)e.value; } } return null ; }
和HashMap不同, HashTable的key和value都不能是null
同样是先定位数组,然后链表查找,已经存在就更新值,不存在才新增
HashTable.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public synchronized V put (K key, V value) { if (value == null ) { throw new NullPointerException(); } Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF ) % tab.length; @SuppressWarnings ("unchecked" ) Entry<K,V> entry = (Entry<K,V>)tab[index]; for (; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } addEntry(hash, key, value, index); return null ; }
新增数据有可能导致扩容,扩容后需要重新哈希
值得注意的是新增数据是链表头部插入,也就是底层数组位置都是新数据
HashTable.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void addEntry (int hash, K key, V value, int index) { modCount++; Entry<?,?> tab[] = table; if (count >= threshold) { rehash(); tab = table; hash = key.hashCode(); index = (hash & 0x7FFFFFFF ) % tab.length; } @SuppressWarnings ("unchecked" ) Entry<K,V> e = (Entry<K,V>) tab[index]; tab[index] = new Entry<>(hash, key, value, e); count++; } private static class Entry <K ,V > implements Map .Entry <K ,V > { protected Entry (int hash, K key, V value, Entry<K,V> next) { this .hash = hash; this .key = key; this .value = value; this .next = next; } }
容量管理
默认容量是质数11,负载因子0.75
HashTable.java 1 2 3 public Hashtable () { this (11 , 0.75f ); }
扩容策略是2倍加1,这是因为很大概率能保持质数
扩容后原数据全部重新计算哈希,同样是头部插入
HashTable.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 protected void rehash () { int oldCapacity = table.length; Entry<?,?>[] oldMap = table; int newCapacity = (oldCapacity << 1 ) + 1 ; if (newCapacity - MAX_ARRAY_SIZE > 0 ) { if (oldCapacity == MAX_ARRAY_SIZE) return ; newCapacity = MAX_ARRAY_SIZE; } Entry<?,?>[] newMap = new Entry<?,?>[newCapacity]; modCount++; threshold = (int )Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1 ); table = newMap; for (int i = oldCapacity ; i-- > 0 ;) { for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) { Entry<K,V> e = old; old = old.next; int index = (e.hash & 0x7FFFFFFF ) % newCapacity; e.next = (Entry<K,V>)newMap[index]; newMap[index] = e; } } }
状态获取
状态变量count并没有volatile修饰,所以为了可见性即使是只读操作也进行了同步化,同一时刻限制只有一个线程访问
状态变量还带有transient修饰,序列化时忽略,反序列化逻辑readObject方法内会重新计算
HashTable.java 1 2 3 4 5 6 7 8 private transient int count;public synchronized int size () { return count; } public synchronized boolean isEmpty () { return count == 0 ; }
ConcurrentHashMap
Hashtable也是线程安全的,但是采用整体加锁,每个方法都有synchronized修饰,同一时刻只有一个线程可以访问。ConcurrentHashMap读操作不加锁,写操作方法内部局部加锁,读写可同时发生,读取的数据只能表示某一时刻的数据,对于批量操作putAll
或clear
,后续可能读到中间结果。同样,状态方法size
,isEmpty
和containsValue
只反映某一时刻结果,只作参考不用于逻辑控制。
Java8之前采用分段锁,数据分段后一个Segment包含若干Entry,Segment本身继承ReentrantLock访问不同数据段不构成竞争,可以同时进行
分段后多少分段就能多少并发默认16,但是遍历时还是相当于全部加一遍锁,Java8更多采用CAS+synchronized
机制,在bin节点上加锁缩小了加锁块,读不加锁,提升性能
基础结构
Java8的ConcurrentHashMap和HashMap基本思路一致,链表+红黑树
共同管理,底层依然是Node数组,与单线程环境下的HashMap不同的是,维护另一个数组
这是因为单线程扩容,直接扩容完成后才访问,扩容后数组直接替换原数组,因此只维护单一数组够用
而多线程环境下,扩容同时还有别的线程访问,因此需要显式维护还在构建中的扩容数组
ConcurrentHashMap.java 1 2 transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;
节点类型
基础Node节点和单线程版本类似,用于链表
ConcurrentHashMap.java 1 2 3 4 5 6 7 static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; V value; Node<K,V> next; }
并发环境下,节点有多个子类细化了角色
只有带有真正的数据的节点哈希码为正数,其他节点不含数据起到标志作用,哈希码是负数
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int HASH_BITS = 0x7fffffff ; static final class TreeNode <K ,V > extends Node <K ,V > { TreeNode<K,V> parent; TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; boolean red; } static final class TreeBin <K ,V > extends Node <K ,V > { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; TreeBin(TreeNode<K,V> b) { super (TREEBIN, null , null , null ); } } static final class ForwardingNode <K ,V > extends Node <K ,V > { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super (MOVED, null , null , null ); this .nextTable = tab; } } static final class ReservationNode <K ,V > extends Node <K ,V > { ReservationNode() { super (RESERVED, null , null , null ); } }
哈希处理
基于key自身的哈希码,高低位异或融合高位信息
再去掉符号位保持正数,负数留作特殊用途
ConcurrentHashMap.java 1 2 3 4 5 static final int HASH_BITS = 0x7fffffff ;static final int spread (int h) { return (h ^ (h >>> 16 )) & HASH_BITS; }
get操作
get操作没有加锁
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
put操作
没有bin节点就CAS创建,找到bin节点后,在节点上加锁,进行后续查找定位及后续操作
新数据链表尾插入
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
扩容
原始数组迁移到2倍容量的新数组
Forwarding节点包括新数组的引用
如果原数组位置是null,变为Forwarding节点
如果原数组有节点,加锁后移动,完成后变为Forwarding节点
修改操作比如put/remove, 定位后发现forwarding节点,参与帮助扩容,扩容完成后新旧数组替换,可以再次尝试操作即操作扩容后新数组。整体思路是如果写操作发现正在扩容,那么应该阻塞等待,但是有点浪费,不如一起参与扩容操作
get操作如果发现数组位置有数据,说明数据还在直接查找返回,如果发现是Forwarding节点说明数据已经迁移完成,会通过Forwarding节点去新数组查询
维护sizeCtl进行容量控制,CAS操作
0表示还没初始化
-1表示正在初始化
-(1+N)表示有N个线程在协助扩容
参与扩容的线程负责连续的多个数组槽,可以根据已经参与的线程数决定是否需要加入帮助
https://www.cnblogs.com/nullzx/p/8647220.html
computeIfAbsent
不存在key时计算一个值插入,大体流程和put相似,由于value不是现成的可能需要时间计算,于是先插一个ReservationNode预定bin节点并加锁,这样其他线程可以知道这个位置将要插入数据
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 public V computeIfAbsent (K key, Function<? super K, ? extends V> mappingFunction) { if (key == null || mappingFunction == null ) throw new NullPointerException(); int h = spread(key.hashCode()); V val = null ; int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & h)) == null ) { Node<K,V> r = new ReservationNode<K,V>(); synchronized (r) { if (casTabAt(tab, i, null , r)) { binCount = 1 ; Node<K,V> node = null ; try { if ((val = mappingFunction.apply(key)) != null ) node = new Node<K,V>(h, key, val, null ); } finally { setTabAt(tab, i, node); } } } if (binCount != 0 ) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { boolean added = false ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; V ev; if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { val = e.val; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { if ((val = mappingFunction.apply(key)) != null ) { added = true ; pred.next = new Node<K,V>(h, key, val, null ); } break ; } } } else if (f instanceof TreeBin) { binCount = 2 ; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(h, key, null )) != null ) val = p.val; else if ((val = mappingFunction.apply(key)) != null ) { added = true ; t.putTreeVal(h, key, val); } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (!added) return val; break ; } } } if (val != null ) addCount(1L , binCount); return val; }
计算数量
多线程环境下,计算大小只计算大概数量,没必要加锁中断操作
整体采用单变量+数组计数相结合的方式,数组可以分散并发压力
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private transient volatile long baseCount;private transient volatile CounterCell[] counterCells;@sun .misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); } final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
put操作后会改变数量,先尝试单变量修改,不成功尝试维护数组
ConcurrentHashMap.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } }