0. 简介:
- ThreadLocal 并不解决线程间共享数据的问题
- ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题;每个线程持有一个 Map 并维护了 ThreadLocal 对象与具体实例的映射,该 Map 由于只被持有它的线程访问,故不存在线程安全以及锁的问题
- ThreadLocalMap 的 Entry 对 ThreadLocal 的引用为弱引用,避免了 ThreadLocal 对象无法被回收的问题
- ThreadLocalMap 的 set 方法通过调用 replaceStaleEntry 方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏
- ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景
1. 一个ThreadLocal的使用案例
反例:
1 | /** |
运行结果:
正例:
1 | /** |
运行结果:
1 | Sun Jul 24 16:34:30 CST 2022 |
总结:反例中,为什么会报错呢?这是因为SimpleDateFormat
不是线性安全的,它以共享变量出现时,并发多线程场景下即会报错。
为什么加了ThreadLocal
就不会有问题呢?并发场景下,ThreadLocal
是如何保证的呢?我们接下来看看ThreadLocal
的核心原理。
2. ThreadLocal原理
2.1 内存结构图
Thread
类中,有个ThreadLocal.ThreadLocalMap
的成员变量。ThreadLocalMap
内部维护了Entry
数组,每个Entry
代表一个完整的对象,key
是ThreadLocal
本身,value
是ThreadLocal
的泛型对象值。
2.2 源码分析
2.2.1 ThreadLocalMap
作为Thread的成员变量
1
2
3
4
5public class Thread implements Runnable {
//ThreadLocal.ThreadLocalMap是Thread的属性
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
//Entry数组
private Entry[] table;
// ThreadLocalMap的构造器,ThreadLocal作为key
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
}ThreadLocalMap的hash算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class ThreadLocal<T> {
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
static class ThreadLocalMap {
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
}
}每当创建一个
ThreadLocal
对象,这个ThreadLocal.nextHashCode
这个值就会增长0x61c88647
。这个值很特殊,它是斐波那契数 也叫 黄金分割数。
hash
增量为 这个数字,带来的好处就是hash
分布非常均匀。我们自己可以尝试下:
可以看到产生的哈希码分布很均匀
hash冲突
注明: 下面示例图中,绿色块
Entry
代表正常数据,灰色块代表Entry
的key
值为null
,已被垃圾回收。白色块表示Entry
为null
。如上图所示,如果我们插入一个
value=27
的数据,通过hash
计算后应该落入第4个槽位中,而槽位4已经有了Entry
数据。此时就会线性向后查找,一直找到
Entry
为null
的槽位才会停止查找,将当前元素放入此槽位中。当然迭代过程中还有其他的情况,比如遇到了Entry
不为null
且key
值相等的情况,还有Entry
中的key
值为null
的情况等等都会有不同的处理,后面会一一详细讲解。这里还画了一个
Entry
中的key
为null
的数据(Entry=2的灰色块数据),因为key
值是弱引用类型,所以会有这种数据存在。在set
过程中,如果遇到了key
过期的Entry
数据,实际上是会进行一轮探测式清理操作的set方法
- 图解:
往
ThreadLocalMap
中set
数据(新增或者更新数据)分为好几种情况。第一种情况: 通过
hash
计算后的槽位对应的Entry
数据为空:这里直接将数据放到该槽位即可。
第二种情况: 槽位数据不为空,
key
值与当前ThreadLocal
通过hash
计算获取的key
值一致:这里直接更新该槽位的数据。
第三种情况: 槽位数据不为空,往后遍历过程中,在找到
Entry
为null
的槽位之前,没有遇到key
过期的Entry
:遍历散列数组,线性往后查找,如果找到
Entry
为null
的槽位,则将数据放入该槽位中,或者往后遍历过程中,遇到了key值相等的数据,直接更新即可。第四种情况: 槽位数据不为空,往后遍历过程中,在找到
Entry
为null
的槽位之前,遇到key
过期的Entry
,如下图,往后遍历过程中,一到了index=7
的槽位数据Entry
的key=null
:散列数组下标为7位置对应的
Entry
数据key
为null
,表明此数据key
值已经被垃圾回收掉了,此时就会执行replaceStaleEntry()
方法,该方法含义是替换过期数据的逻辑,以index=7位起点开始遍历,进行探测式数据清理工作。replaceStaleEntry()方法: 替换过期数据
初始化探测式清理过期数据扫描的开始位置:
slotToExpunge(过期数据起始扫描下标) = staleSlot = 7
首先:以当前
staleSlot
开始 向前迭代查找,找其他过期的数据,然后更新下标slotToExpunge
。for
循环迭代,直到碰到Entry
为null
结束。如下图所示,slotToExpunge被更新为0:上面向前迭代的操作是为了更新探测清理过期数据的起始下标
slotToExpunge
的值,它是用来判断当前过期槽位staleSlot
之前是否还有过期元素。其次:接着开始以
staleSlot
位置(index=7) 向后迭代查找, a. 如果找到了相同key值的Entry数据:
从当前节点
staleSlot
向后查找key
值相等的Entry
元素,找到后更新Entry
的值并交换staleSlot
元素的位置(staleSlot
位置为过期元素),更新Entry
数据,然后开始进行过期Entry
的清理工作,如下图所示: b. 如果没有找到相同key值的Entry数据:
从当前节点
staleSlot
向后查找key
值相等的Entry
元素,直到Entry
为null
则停止寻找。通过上图可知,此时table
中没有key
值相同的Entry
。创建新的
Entry
,替换table[stableSlot]
位置:最后:替换完成后也是进行过期元素清理工作,清理工作主要是有两个方法:
expungeStaleEntry()
和cleanSomeSlots()
- 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public void set(T value) {
Thread t = Thread.currentThread(); //获取当前线程t
ThreadLocalMap map = getMap(t); //根据当前线程获取到ThreadLocalMap
if (map != null) //如果获取的ThreadLocalMap对象不为空
map.set(this, value); //K,V设置到ThreadLocalMap中
else
createMap(t, value); //创建一个新的ThreadLocalMap
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals; //返回Thread对象的ThreadLocalMap属性
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
void createMap(Thread t, T firstValue) { //调用ThreadLocalMap的构造函数
t.threadLocals = new ThreadLocalMap(this, firstValue); this表示当前类ThreadLocal
}get方法
- 图解:
第一种情况: 通过查找
key
值计算出散列表中slot
位置,然后该slot
位置中的Entry.key
和查找的key
一致,则直接返回:第二种情况:
slot
位置中的Entry.key
和要查找的key
不一致:我们以
get(ThreadLocal1)
为例,通过hash
计算后,正确的slot
位置应该是4,而index=4
的槽位已经有了数据,且key
值不等于ThreadLocal1
,所以需要继续往后迭代查找。迭代到
index=5
的数据时,此时Entry.key=null
,触发一次探测式数据回收操作,执行expungeStaleEntry()
方法,执行完后,index 5,8
的数据都会被回收,而index 6,7
的数据都会前移,此时继续往后迭代,到index = 6
的时候即找到了key
值相等的Entry
数据,如下图所示:- 源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public T get() {
Thread t = Thread.currentThread();//获取当前线程t
ThreadLocalMap map = getMap(t);//根据当前线程获取到ThreadLocalMap
if (map != null) { //如果获取的ThreadLocalMap对象不为空
//由this(即ThreadLoca对象)得到对应的Value,即ThreadLocal的泛型值
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
return setInitialValue(); //初始化threadLocals成员变量的值
}
private T setInitialValue() {
T value = initialValue(); //初始化value的值
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t); //以当前线程为key,获取threadLocals成员变量,它是一个ThreadLocalMap
if (map != null)
map.set(this, value); //K,V设置到ThreadLocalMap中
else
createMap(t, value); //实例化threadLocals成员变量
return value;
}过期key的探测式清理流程
- 探测式清理:
expungeStaleEntry
方法,遍历散列数组,从开始位置向后探测清理过期数据,将过期数据的Entry
设置为null
,沿途中碰到未过期的数据则将此数据rehash
后重新在table
数组中定位,如果定位的位置已经有了数据,则会将未过期的数据放到最靠近此位置的Entry=null
的桶中,使rehash
后的Entry
数据距离正确的桶的位置更近一些。操作逻辑如下:如上图,
set(27)
经过hash计算后应该落到index=4
的桶中,由于index=4
桶已经有了数据,所以往后迭代最终数据放入到index=7
的桶中,放入后一段时间后index=5
中的Entry
数据key
变为了null
如果再有其他数据
set
到map
中,就会触发探测式清理操作。如上图,执行探测式清理后,
index=5
的数据被清理掉,继续往后迭代,到index=7
的元素时,经过rehash
后发现该元素正确的index=4
,而此位置已经已经有了数据,往后查找离index=4
最近的Entry=null
的节点(刚被探测式清理掉的数据:index=5),找到后移动index= 7
的数据到index=5
中,此时桶的位置离正确的位置index=4
更近了。经过一轮探测式清理后,
key
过期的数据会被清理掉,没过期的数据经过rehash
重定位后所处的桶位置理论上更接近i= key.hashCode & (tab.len - 1)
的位置。这种优化会提高整个散列表查询性能。接着看下
expungeStaleEntry()
具体流程,我们还是以先原理图后源码讲解的方式来一步步梳理:我们假设
expungeStaleEntry(3)
来调用此方法,如上图所示,我们可以看到ThreadLocalMap
中table
的数据情况,接着执行清理操作:第一步是清空当前
staleSlot
位置的数据,index=3
位置的Entry
变成了null
。然后接着往后探测:执行完第二步后,index=4的元素挪到index=3的槽位中。
继续往后迭代检查,碰到正常数据,计算该数据位置是否偏移,如果被偏移,则重新计算
slot
位置,目的是让正常数据尽可能存放在正确位置或离正确位置更近的位置在往后迭代的过程中碰到空的槽位,终止探测,这样一轮探测式清理工作就完成了,接着我们继续看看具体实现源代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}这里我们还是以
staleSlot=3
来做示例说明,首先是将tab[staleSlot]
槽位的数据清空,然后设置size--
接着以staleSlot
位置往后迭代,如果遇到k==null
的过期数据,也是清空该槽位数据,然后size--
1
2
3
4
5
6
7ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
}如果
key
没有过期,重新计算当前key
的下标位置是不是当前槽位下标位置,如果不是,那么说明产生了hash
冲突,此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry
的位置。1
2
3
4
5
6
7
8
9int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}这里是处理正常的产生
Hash
冲突的数据,经过迭代后,有过Hash
冲突数据的Entry
位置会更靠近正确位置,这样的话,查询的时候 效率才会更高。- 启发式清理
启发式清理被作者定义为:Heuristically scan some cells looking for stale entries.
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}扩容方法
在
ThreadLocalMap.set()
方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中Entry
的数量已经达到了列表的扩容阈值(len*2/3)
,就开始执行rehash()
逻辑:1
2
3// entry数组的数量达到了扩容阈值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();接着看下
rehash()
具体实现:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private void rehash() {
// 进行探测式清理
expungeStaleEntries();
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}这里首先是会进行探测式清理工作,从
table
的起始位置往后清理。清理完成之后,table
中可能有一些key
为null
的Entry
数据被清理掉,所以此时通过判断size >= threshold - threshold / 4
也就是size >= threshold* 3/4
来决定是否扩容。注意:
rehash()
的阈值是size >= threshold
接着看看具体的
resize()
方法,为了方便演示,我们以oldTab.len=8
来举例:扩容后的
tab
的大小为oldLen * 2
,然后遍历老的散列表,重新计算hash
位置,然后放到新的tab
数组中,如果出现**hash
冲突则往后寻找最近的entry
为null
的槽位,遍历完成之后,oldTab
中所有的entry
数据都已经放入到新的tab
中了。重新计算tab
下次扩容的阈值**,具体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
2.3.2 InheritableThreadLocal
我们使用ThreadLocal
的时候,在异步场景下是无法给子线程共享父线程中创建的线程副本数据的。
为了解决这个问题,JDK中还有一个InheritableThreadLocal
类,我们来看一个例子:
1 | public class InheritableThreadLocalDemo { |
打印结果:
1 | java复制代码子线程获取父类threadLocal数据:null |
可以发现,在子线程中,是可以获取到父线程的 InheritableThreadLocal
类型变量的值,但是不能获取到 ThreadLocal
类型变量的值。
获取不到ThreadLocal
类型的值,我们可以好理解,因为它是线程隔离的嘛。InheritableThreadLocal
是如何做到的呢?原理是什么呢?
实现原理是子线程是通过在父线程中通过调用new Thread()
方法来创建子线程,Thread#init
方法在Thread
的构造方法中被调用。在init
方法中拷贝父线程数据到子线程中:
1 | private void init(ThreadGroup g, Runnable target, String name, |
但InheritableThreadLocal
仍然有缺陷,一般我们做异步化处理都是使用的线程池,而InheritableThreadLocal
是在new Thread
中的init()
方法给赋值的,而线程池是线程复用的逻辑,所以这里会存在问题。
3. 应用
3.1 用户全局token
- 使用场景
前端请求带有用户的token,全局获取用户信息 - 原理实现
自定义拦截器,通过token获取用户信息,将用户信息放入threadlocal
3.2 MDC全局链路id
- 使用场景
- 系统traceId链路调用
- 系统间调用traceId传递调用
- 基于线程池的父子线程traceId传递
- 消息队列场景下traceId链路追踪
- 原理实现
- 收到前端请求后生成uuid代表traceId
- 通过feign调用,调用Header中传递traceId ;发送方自定义实现RequestInterceptor
- 自定义ThreadPoolExecutor;重写run方法,创建子线程的时候将父线程的数据信息拷贝到子线程
- 消息队列中添加traceId属性