0. 简介:

  • ThreadLocal 并不解决线程间共享数据的问题
  • ThreadLocal 通过隐式的在不同线程内创建独立实例副本避免了实例线程安全的问题;每个线程持有一个 Map 并维护了 ThreadLocal 对象与具体实例的映射,该 Map 由于只被持有它的线程访问,故不存在线程安全以及锁的问题
  • ThreadLocalMap 的 Entry 对 ThreadLocal 的引用为弱引用,避免了 ThreadLocal 对象无法被回收的问题
  • ThreadLocalMap 的 set 方法通过调用 replaceStaleEntry 方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏
  • ThreadLocal 适用于变量在线程间隔离且在方法间共享的场景

1. 一个ThreadLocal的使用案例

反例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 日期工具类
*/
public class DateUtil {

private static final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static Date parse(String dateString) {
Date date = null;
try {
date = simpleDateFormat.parse(dateString);
} catch (ParseException e) {
e.printStackTrace();
}
return date;
}


public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);

for (int i = 0; i < 10; i++) {
executorService.execute(()->{
System.out.println(DateUtil.parse("2022-07-24 16:34:30"));
});
}
executorService.shutdown();
}

运行结果:

image-20240329211101453

正例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 日期工具类
*/
public class DateUtil {

private static ThreadLocal<SimpleDateFormat> dateFormatThreadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

public static Date parse(String dateString) {
Date date = null;
try {
date = dateFormatThreadLocal.get().parse(dateString);
} catch (ParseException e) {
e.printStackTrace();
}
return date;
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);

for (int i = 0; i < 10; i++) {
executorService.execute(()->{
System.out.println(DateUtil.parse("2022-07-24 16:34:30"));
});
}
executorService.shutdown();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022
Sun Jul 24 16:34:30 CST 2022

Process finished with exit code 0

总结:反例中,为什么会报错呢?这是因为SimpleDateFormat不是线性安全的,它以共享变量出现时,并发多线程场景下即会报错。

为什么加了ThreadLocal就不会有问题呢?并发场景下,ThreadLocal是如何保证的呢?我们接下来看看ThreadLocal的核心原理。

2. ThreadLocal原理

2.1 内存结构图

img

  • Thread类中,有个ThreadLocal.ThreadLocalMap 的成员变量。

  • ThreadLocalMap内部维护了Entry数组,每个Entry代表一个完整的对象,keyThreadLocal本身,valueThreadLocal的泛型对象值。

2.2 源码分析

2.2.1 ThreadLocalMap

  1. 作为Thread的成员变量

    1
    2
    3
    4
    5
    public class Thread implements Runnable {
    //ThreadLocal.ThreadLocalMap是Thread的属性
    ThreadLocal.ThreadLocalMap threadLocals = null;
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    }
  2. 构造方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    static class ThreadLocalMap {

    static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
    super(k);
    value = v;
    }
    }
    //Entry数组
    private Entry[] table;

    // ThreadLocalMap的构造器,ThreadLocal作为key
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
    }
    }
  3. ThreadLocalMap的hash算法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ThreadLocal<T> {
    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode = new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

    static class ThreadLocalMap {
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);

    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);
    }
    }
    }

    每当创建一个ThreadLocal对象,这个ThreadLocal.nextHashCode 这个值就会增长 0x61c88647

    这个值很特殊,它是斐波那契数 也叫 黄金分割数hash增量为 这个数字,带来的好处就是 hash 分布非常均匀

    我们自己可以尝试下:

    YKbSGn.png

    可以看到产生的哈希码分布很均匀

  4. hash冲突

    注明: 下面示例图中,绿色块Entry代表正常数据灰色块代表Entrykey值为null已被垃圾回收白色块表示Entrynull

    Ynzr5D.png

    如上图所示,如果我们插入一个value=27的数据,通过hash计算后应该落入第4个槽位中,而槽位4已经有了Entry数据。

    此时就会线性向后查找,一直找到Entrynull的槽位才会停止查找,将当前元素放入此槽位中。当然迭代过程中还有其他的情况,比如遇到了Entry不为nullkey值相等的情况,还有Entry中的key值为null的情况等等都会有不同的处理,后面会一一详细讲解。

    这里还画了一个Entry中的keynull的数据(Entry=2的灰色块数据),因为key值是弱引用类型,所以会有这种数据存在。在set过程中,如果遇到了key过期的Entry数据,实际上是会进行一轮探测式清理操作的

  5. set方法

    • 图解:

    ThreadLocalMapset数据(新增或者更新数据)分为好几种情况。

    第一种情况: 通过hash计算后的槽位对应的Entry数据为空:

    YuSniD.png

    这里直接将数据放到该槽位即可。

    第二种情况: 槽位数据不为空,key值与当前ThreadLocal通过hash计算获取的key值一致:

    image.png

    这里直接更新该槽位的数据。

    第三种情况: 槽位数据不为空,往后遍历过程中,在找到Entrynull的槽位之前,没有遇到key过期的Entry

    image.png

    遍历散列数组,线性往后查找,如果找到Entrynull的槽位,则将数据放入该槽位中,或者往后遍历过程中,遇到了key值相等的数据,直接更新即可。

    第四种情况: 槽位数据不为空,往后遍历过程中,在找到Entrynull的槽位之前,遇到key过期的Entry,如下图,往后遍历过程中,一到了index=7的槽位数据Entrykey=null

    Yu77qg.png

    散列数组下标为7位置对应的Entry数据keynull,表明此数据key值已经被垃圾回收掉了,此时就会执行replaceStaleEntry()方法,该方法含义是替换过期数据的逻辑,以index=7位起点开始遍历,进行探测式数据清理工作

    replaceStaleEntry()方法: 替换过期数据

    初始化探测式清理过期数据扫描的开始位置:slotToExpunge(过期数据起始扫描下标) = staleSlot = 7

    首先:以当前staleSlot开始 向前迭代查找,找其他过期的数据,然后更新下标slotToExpungefor循环迭代,直到碰到Entrynull结束。如下图所示,slotToExpunge被更新为0

    YuHSMT.png

    上面向前迭代的操作是为了更新探测清理过期数据的起始下标slotToExpunge的值,它是用来判断当前过期槽位staleSlot之前是否还有过期元素。

    其次:接着开始以staleSlot位置(index=7) 向后迭代查找

    ​ a. 如果找到了相同key值的Entry数据:

    YuHEJ1.png

    从当前节点staleSlot向后查找key值相等的Entry元素,找到后更新Entry的值并交换staleSlot元素的位置(staleSlot位置为过期元素),更新Entry数据,然后开始进行过期Entry的清理工作,如下图所示:

    Yu4oWT.png

    ​ b. 如果没有找到相同key值的Entry数据:

    YuHMee.png

    从当前节点staleSlot向后查找key值相等的Entry元素,直到Entrynull则停止寻找。通过上图可知,此时table中没有key值相同的Entry

    创建新的Entry,替换table[stableSlot]位置:

    YuH3FA.png

    最后:替换完成后也是进行过期元素清理工作,清理工作主要是有两个方法:expungeStaleEntry()cleanSomeSlots()


    • 源码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public void set(T value) {
    Thread t = Thread.currentThread(); //获取当前线程t
    ThreadLocalMap map = getMap(t); //根据当前线程获取到ThreadLocalMap
    if (map != null) //如果获取的ThreadLocalMap对象不为空
    map.set(this, value); //K,V设置到ThreadLocalMap中
    else
    createMap(t, value); //创建一个新的ThreadLocalMap
    }

    ThreadLocalMap getMap(Thread t) {
    return t.threadLocals; //返回Thread对象的ThreadLocalMap属性
    }

    private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    for (Entry e = tab[i];
    e != null;
    e = tab[i = nextIndex(i, len)]) {
    ThreadLocal<?> k = e.get();

    if (k == key) {
    e.value = value;
    return;
    }

    if (k == null) {
    replaceStaleEntry(key, value, i);
    return;
    }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();
    }

    void createMap(Thread t, T firstValue) { //调用ThreadLocalMap的构造函数
    t.threadLocals = new ThreadLocalMap(this, firstValue); this表示当前类ThreadLocal
    }
  6. get方法

    • 图解:

    第一种情况: 通过查找key值计算出散列表中slot位置,然后该slot位置中的Entry.key和查找的key一致,则直接返回:

    YuWfdx.png

    第二种情况: slot位置中的Entry.key和要查找的key不一致:

    YuWyz4.png

    我们以get(ThreadLocal1)为例,通过hash计算后,正确的slot位置应该是4,而index=4的槽位已经有了数据,且key值不等于ThreadLocal1,所以需要继续往后迭代查找。

    迭代到index=5的数据时,此时Entry.key=null,触发一次探测式数据回收操作,执行expungeStaleEntry()方法,执行完后,index 5,8的数据都会被回收,而index 6,7的数据都会前移,此时继续往后迭代,到index = 6的时候即找到了key值相等的Entry数据,如下图所示:

    YuW8JS.png

    • 源码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public T get() {
    Thread t = Thread.currentThread();//获取当前线程t
    ThreadLocalMap map = getMap(t);//根据当前线程获取到ThreadLocalMap
    if (map != null) { //如果获取的ThreadLocalMap对象不为空
    //由this(即ThreadLoca对象)得到对应的Value,即ThreadLocal的泛型值
    ThreadLocalMap.Entry e = map.getEntry(this);
    if (e != null) {
    @SuppressWarnings("unchecked")
    T result = (T)e.value;
    return result;
    }
    }
    return setInitialValue(); //初始化threadLocals成员变量的值
    }

    private T setInitialValue() {
    T value = initialValue(); //初始化value的值
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); //以当前线程为key,获取threadLocals成员变量,它是一个ThreadLocalMap
    if (map != null)
    map.set(this, value); //K,V设置到ThreadLocalMap中
    else
    createMap(t, value); //实例化threadLocals成员变量
    return value;
    }
  7. 过期key的探测式清理流程

    • 探测式清理:

    expungeStaleEntry方法,遍历散列数组,从开始位置向后探测清理过期数据,将过期数据的Entry设置为null,沿途中碰到未过期的数据则将此数据rehash后重新在table数组中定位,如果定位的位置已经有了数据,则会将未过期的数据放到最靠近此位置的Entry=null的桶中,使rehash后的Entry数据距离正确的桶的位置更近一些。操作逻辑如下:

    YuH2OU.png

    如上图,set(27) 经过hash计算后应该落到index=4的桶中,由于index=4桶已经有了数据,所以往后迭代最终数据放入到index=7的桶中,放入后一段时间后index=5中的Entry数据key变为了null

    YuHb6K.png

    如果再有其他数据setmap中,就会触发探测式清理操作。

    如上图,执行探测式清理后,index=5的数据被清理掉,继续往后迭代,到index=7的元素时,经过rehash后发现该元素正确的index=4,而此位置已经已经有了数据,往后查找离index=4最近的Entry=null的节点(刚被探测式清理掉的数据:index=5),找到后移动index= 7的数据到index=5中,此时桶的位置离正确的位置index=4更近了。

    经过一轮探测式清理后,key过期的数据会被清理掉,没过期的数据经过rehash重定位后所处的桶位置理论上更接近i= key.hashCode & (tab.len - 1)的位置。这种优化会提高整个散列表查询性能。

    接着看下expungeStaleEntry()具体流程,我们还是以先原理图后源码讲解的方式来一步步梳理:

    Yuf301.png

    我们假设expungeStaleEntry(3) 来调用此方法,如上图所示,我们可以看到ThreadLocalMaptable的数据情况,接着执行清理操作:

    YufupF.png

    第一步是清空当前staleSlot位置的数据,index=3位置的Entry变成了null。然后接着往后探测:

    YufAwq.png

    执行完第二步后,index=4的元素挪到index=3的槽位中。

    继续往后迭代检查,碰到正常数据,计算该数据位置是否偏移,如果被偏移,则重新计算slot位置,目的是让正常数据尽可能存放在正确位置或离正确位置更近的位置

    YuWjTP.png

    在往后迭代的过程中碰到空的槽位,终止探测,这样一轮探测式清理工作就完成了,接着我们继续看看具体实现源代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    Entry e;
    int i;
    for (i = nextIndex(staleSlot, len);
    (e = tab[i]) != null;
    i = nextIndex(i, len)) {
    ThreadLocal<?> k = e.get();
    if (k == null) {
    e.value = null;
    tab[i] = null;
    size--;
    } else {
    int h = k.threadLocalHashCode & (len - 1);
    if (h != i) {
    tab[i] = null;

    while (tab[h] != null)
    h = nextIndex(h, len);
    tab[h] = e;
    }
    }
    }
    return i;
    }

    这里我们还是以staleSlot=3 来做示例说明,首先是将tab[staleSlot]槽位的数据清空,然后设置size-- 接着以staleSlot位置往后迭代,如果遇到k==null的过期数据,也是清空该槽位数据,然后size--

    1
    2
    3
    4
    5
    6
    7
    ThreadLocal<?> k = e.get();

    if (k == null) {
    e.value = null;
    tab[i] = null;
    size--;
    }

    如果key没有过期,重新计算当前key的下标位置是不是当前槽位下标位置,如果不是,那么说明产生了hash冲突,此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry的位置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    int h = k.threadLocalHashCode & (len - 1);
    if (h != i) {
    tab[i] = null;

    while (tab[h] != null)
    h = nextIndex(h, len);

    tab[h] = e;
    }

    这里是处理正常的产生Hash冲突的数据,经过迭代后,有过Hash冲突数据的Entry位置会更靠近正确位置,这样的话,查询的时候 效率才会更高。

    • 启发式清理

    启发式清理被作者定义为:Heuristically scan some cells looking for stale entries.
    YK5HJ0.png

    源码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
    i = nextIndex(i, len);
    Entry e = tab[i];
    if (e != null && e.get() == null) {
    n = len;
    removed = true;
    i = expungeStaleEntry(i);
    }
    } while ( (n >>>= 1) != 0);
    return removed;
    }

  8. 扩容方法

    ThreadLocalMap.set()方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中Entry的数量已经达到了列表的扩容阈值(len*2/3),就开始执行rehash()逻辑:

    1
    2
    3
    // entry数组的数量达到了扩容阈值
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();

    接着看下rehash()具体实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    private void rehash() {
    // 进行探测式清理
    expungeStaleEntries();

    if (size >= threshold - threshold / 4)
    resize();
    }

    private void expungeStaleEntries() {
    Entry[] tab = table;
    int len = tab.length;
    for (int j = 0; j < len; j++) {
    Entry e = tab[j];
    if (e != null && e.get() == null)
    expungeStaleEntry(j);
    }
    }

    这里首先是会进行探测式清理工作,从table的起始位置往后清理。清理完成之后,table中可能有一些keynullEntry数据被清理掉,所以此时通过判断size >= threshold - threshold / 4 也就是size >= threshold* 3/4 来决定是否扩容。

    注意:rehash()的阈值是size >= threshold

    YuqwPs.png

    接着看看具体的resize()方法,为了方便演示,我们以oldTab.len=8来举例:

    Yu2QOI.png

    扩容后的tab的大小为oldLen * 2,然后遍历老的散列表,重新计算hash位置,然后放到新的tab数组中,如果出现**hash冲突则往后寻找最近的entrynull的槽位,遍历完成之后,oldTab中所有的entry数据都已经放入到新的tab中了。重新计算tab下次扩容的阈值**,具体代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    private void resize() {
    Entry[] oldTab = table;
    int oldLen = oldTab.length;
    int newLen = oldLen * 2;
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    for (int j = 0; j < oldLen; ++j) {
    Entry e = oldTab[j];
    if (e != null) {
    ThreadLocal<?> k = e.get();
    if (k == null) {
    e.value = null;
    } else {
    int h = k.threadLocalHashCode & (newLen - 1);
    while (newTab[h] != null)
    h = nextIndex(h, newLen);
    newTab[h] = e;
    count++;
    }
    }
    }

    setThreshold(newLen);
    size = count;
    table = newTab;
    }

2.3.2 InheritableThreadLocal

我们使用ThreadLocal的时候,在异步场景下是无法给子线程共享父线程中创建的线程副本数据的。

为了解决这个问题,JDK中还有一个InheritableThreadLocal类,我们来看一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class InheritableThreadLocalDemo {
public static void main(String[] args) {
ThreadLocal<String> threadLocal = new ThreadLocal<>();
ThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
threadLocal.set("父类数据:threadLocal");
inheritableThreadLocal.set("父类数据:inheritableThreadLocal");

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子线程获取父类threadLocal数据:" + threadLocal.get());
System.out.println("子线程获取父类inheritableThreadLocal数据:" + inheritableThreadLocal.get());
}
}).start();
}
}

打印结果:

1
2
java复制代码子线程获取父类threadLocal数据:null
子线程获取父类inheritableThreadLocal数据:父类数据:inheritableThreadLocal

可以发现,在子线程中,是可以获取到父线程的 InheritableThreadLocal 类型变量的值,但是不能获取到 ThreadLocal 类型变量的值。

获取不到ThreadLocal 类型的值,我们可以好理解,因为它是线程隔离的嘛。InheritableThreadLocal 是如何做到的呢?原理是什么呢?

实现原理是子线程是通过在父线程中通过调用new Thread()方法来创建子线程,Thread#init方法在Thread的构造方法中被调用。在init方法中拷贝父线程数据到子线程中:

1
2
3
4
5
6
7
8
9
10
11
12
13
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
tid = nextThreadID();
}

InheritableThreadLocal仍然有缺陷,一般我们做异步化处理都是使用的线程池,而InheritableThreadLocal是在new Thread中的init()方法给赋值的,而线程池是线程复用的逻辑,所以这里会存在问题。

3. 应用

3.1 用户全局token

  • 使用场景
    前端请求带有用户的token,全局获取用户信息
  • 原理实现
    自定义拦截器,通过token获取用户信息,将用户信息放入threadlocal

3.2 MDC全局链路id

  • 使用场景
    1. 系统traceId链路调用
    2. 系统间调用traceId传递调用
    3. 基于线程池的父子线程traceId传递
    4. 消息队列场景下traceId链路追踪
  • 原理实现
    1. 收到前端请求后生成uuid代表traceId
    2. 通过feign调用,调用Header中传递traceId ;发送方自定义实现RequestInterceptor
    3. 自定义ThreadPoolExecutor;重写run方法,创建子线程的时候将父线程的数据信息拷贝到子线程
    4. 消息队列中添加traceId属性

本站由 卡卡龙 使用 Stellar 1.29.1主题创建

本站访问量 次. 本文阅读量 次.