java并发
totvsraccq
.1. 并发/并行/串行、进程/线程区别?创建线程方式?√run和start方法区别?√停止线程方法?线程运行时异常会怎么样?主线程能捕获子线程异常吗?√线程调度方法?sleep()和wait()区别?√ 线程状态及转换?线程上下文切换概念?守护/用户线程区别?线程通信方式?并发编程三要素?线程安全概念?原因?解决方法?线程同步实现方式?
单核处理器轮流执行任务;多核或多个处理器同时执行多个任务;多个任务按顺序执行
进程是程序运行和操作系统资源分配的基本单位,线程是cpu调度和执行的基本单位
继承Thread并重写run方法;实现Runnable接口并实现run方法;通过Callable和FutureTask
public class MyCallable implements Callable<Integer>{
@Override
public Integer call() { return 1; }
public static void main(String[] args) throws ExecutionException, InterruptedException{
FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyCallable());
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("结果 " + futureTask.get());//调用后会阻塞主进程的继续往下执行
}
}
start方法使线程进入就绪状态并在得到CPU时间后执行run方法;run方法普通方法调用
使用volatile标志位/interrupt方法中断线程。在线程的run方法中while检查标志位\中断状态
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {//响应中断
// 执行任务
}
} catch (InterruptedException e) {
// 线程被中断时的清理代码
} finally {
// 线程结束前的清理代码
}
}
如果异常没有捕获,线程会停止执行并释放持有的对象监视器
不能。主线程可用Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler())捕获子线程异常
Object类
wait():当线程A调用共享变量的wait方法时会被阻塞挂起,直到其他线程调用共享变量的notify/notifyAll方法或者调用线程A的interrupt()方法,线程A抛出InterruptedException异常才返回。
wait(long timeout):当线程A没有在timeout时间内被其它线程唤醒则超时返回
notify()/notifyAll():随机唤醒一个/唤醒所有在共享变量上调用wait系列方法后被挂起的线程
Thread类
join():线程A执行thread.join()时会等待thread线程的run方法执行完成后才继续执行
sleep(long millis)
yield():静态方法,让出CPU,有可能立马又得到CPU调度
interrupt():中断线程并设置中断标志为true,使wait、join、sleep方法抛出InterruptedException,不停止线程。需要捕获异常处理
isInterrupted():检测线程中断。不清除中断标志位
interrupted():检测线程中断,清除中断标志位。线程被中断时只会第一次返回true
wait、notify范式?
// 等待方
synchronized(对象){
//为什么被通知后仍要检查条件?防止多线程情况下,都从wait()返回时条件不满足导致程序出错
while(条件不满足){
对象.wait();
}
处理逻辑
}
// 通知方
synchronized(对象){
改变条件
对象.notifyAll();
}
wait是Object实例方法,sleep是Thread的静态方法
wait必须在同步方法或者同步块中调用(必须已获得对象锁)。否则抛出IllegalMonitorStateException,sleep没有限制
都让出CPU,wait释放占有的对象锁。sleep不会释放掉对象锁
wait必须等待Object.notify/notifyAll通知或者wait()等待时间到期后获得CPU时间片继续执行,sleep在休眠时间达到后获得CPU时间片继续执行
new:初始状态,线程被创建
runnnable:运行状态,java将操作系统的就绪和运行两种状态统称为运行中
blocked:阻塞状态,线程阻塞与锁
waiting:等待状态,线程等待其他线程通知或者中断
time_waiting:超时等待状态,在指定时间返回
terminated:终止状态,线程执行完毕
状态切换图
![threadstatus.png](https://b.bdstatic.com/comment/Y_bZHaS27NSYIAE9PqRzMw544670c977c7892220f2b5cce5a6c4e5.png)
操作系统在多个线程之间切换执行时,保存当前线程的执行状态并恢复另一个线程的执行状态的过程。
User:运行在前台执行具体任务,如main线程
Daemon:运行在后台为前台线程服务。子线程也是守护线程,在start()前设置setDaemon(true)为守护线程避免抛异常,如垃圾回收线程
区别:程序运行完毕,JVM会等待用户线程完成后关闭,但不会等待守护线程完成,因此守护线程中不能用finally确保关闭或清理资源
Thread.join、interrupt()和isInterrupted()
BlockingQueue
CountDownLatch、CyclicBarrier、Semaphore等并发工具类
Object wait、notify、notifyAll
ReentrantLock + Condition
Exchangertbcore
原子性:一个操作或一组操作要么全部执行成功,要么完全不执行,不会被其他线程干扰
可见性:一个线程对共享变量的修改,其他线程能立即知道
有序性:程序执行的顺序按照代码的先后顺序执行
多个线程同时访问共享资源时,不会产生数据不一致或其他不可预期的错误
DateFormat实现,包括SimpleDateFormat都不是线程安全的,推荐用joda-time
原因:主内存和工作内存数据不一致和重排序导致表现全局变量及静态变量同时读写tvsracc
线程安全实现方式
同步机制:
互斥锁mutex:在访问共享资源前先对互斥量进行加锁,访问完后再解锁。对互斥量加锁后,任何其他试图再次对互斥量加锁的线程都会被阻塞,直到当前线程解锁。
读写锁:读加读锁、写加写锁;读锁与读锁不互斥,写锁与写锁、写锁与读锁互斥。适合读多写少
条件变量:允许线程在满足特定条件时才继续执行,否则进入等待状态。通常与互斥量一起使用,防止竞争条件的发生
自旋锁:不会让线程进入睡眠状态,而是一直循环检测锁是否被释放。用于锁的持有时间非常短的情况
信号量:为多个进程提供共享数据对象的访问弧度条自信
原子类
并发集合
ThreadLocal
CountDownLatch、CyclicBarrier、Semaphore等工具类
Fork/Join框架
.2. ThreadLocal理解?场景?原理?ThreadLocalMap怎么解决Hash冲突?扩容机制?为什么key设计成弱引用?内存泄露问题及解决方案?父子线程怎么共享数据?原理?
线程局部变量类。为每个线程提供独立的变量副本,彼此之间无法共享,保证线程安全
存储用户信息上下文。用户登录后每次请求头中携带token,在控制层拦截请求并根据token解析出用户信息。存入ThreadLocal,以便在其他地方使用
public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) { ctx.set(user); }
public static String currentUser() { return ctx.get(); }
@Override
public void close() { ctx.remove(); }
}
try (var ctx = new UserContext("Bob")) {
// 可任意调用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} finally {
//调用UserContext.close()方法释放ThreadLocal关联对象
}
private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
public Connection initialValue() { return DriverManager.getConnection(DB_URL); }
};
public static Connection getConnection() { return connectionHolder.get(); }
每个线程维护自己的数据库连接,不会出现A线程关了B线程正在使用的JDBC连接
确保事务在同一个线程中传播
ThreadLocal通过ThreadLocalMap实现线程间的数据隔离
~是ThreadLocal的静态内部类,线程调用ThreadLocal的set或get方法时,实际上是访问线程的~
ThreadLocalMap包含Entry数组,Entry键值对继承了WeakReference,key是ThreadLocal的弱引用,value是ThreadLocal的泛型值
ThreadLocal源码
public class Thread implements Runnable {
//懒加载:由ThreadLocal的set/get方法中调用createMap方法初始化
ThreadLocal.ThreadLocalMap threadLocals = null;
}
public class ThreadLocal<T> {
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
//map==null则调用createMap初始化t.threadLocals
return setInitialValue();
}
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private Entry[] table;
}
}
开放定址法-线性探测
哈希取余法:ThredLocal的threadLocalHashCode和table数组长度减一&与运算(等价取模运算)。得到table数组下标,threadLocalHashCode从0开始,每次创建ThredLocal递增0x61c88647斐波那契数1.168。使得hash分布均匀
插入数据时通过hash计算得到数组下标,如果对应位置有数据且Entry数据的key和当前不相等。就会线性+1向后查找直到找到空槽位,把元素放到空槽中
获取数据时通过hash计算得到数组下标,如果对应位置有数据且Entry数组的key和get方法参数key相等则直接返回,不相等则线性+1查找下一个位置
set、resize源码
private static AtomicInteger nextHashCode = new AtomicInteger();
private final int threadLocalHashCode = nextHashCode();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.refersTo(key)) {
e.value = value;
return;
}
if (e.refersTo(null)) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);//+1
Entry e = tab[i];
if (e != null && e.refersTo(null)) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
private void rehash() {
//清理过期Entry
expungeStaleEntries();
if (size >= threshold - threshold / 4)
resize();
}
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);//清除j及j关联的hash冲突链上的空Entry并调换2者位置
}
}
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (Entry e : oldTab) {
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
ThreadLocalMap.set()时,如果执行cleanSomeSlots启发式清除部分失效Entry(从i开始扫描log2(n)个)个数为0,Entry对象数量大于等于扩容阈值len2/3执行
rehash()清理过期的Entry,然后判断size大于等于threshold 3/4 是否扩容。
resize()扩容后新数组的大小为老数组的两倍,然后遍历老数组,散列方法重新计算位置,开放地址解决冲突,然后放到新数组,遍历完成之后,然后table引用指向新数组
为了防止内存泄漏。保证ThreadLocal没有强引用或者内存不足时回收key,ThreadLocal值行set、get时会将key为null的Entry进行清理
ThreadLocal的ThreadLocalMap通常会随着线程结束被回收,避免内存泄漏。但如果线程一直在运行,并且Entry.value一直被强引用,那么就不会被回收,导致内存泄漏。当Entry非常多时导致内存溢出
数据污染:如果不finally清除,则下次访问TheadLocal继续留值
每次使用完ThreadLocal在finally调用remove方法清除
父线程使用InheritableThreadLocal来给子线程传值
线程初始化时如果父线程的inheritableThreadLocals不为空,则赋值给当前线程的inheritableThreadLocals
实例
public class InheritableThreadLocalTest {
public static void main(String[] args) {
final ThreadLocal threadLocal = new InheritableThreadLocal();
threadLocal.set("b");// 主线程
Thread t = new Thread() {//子线程
@Override
public void run() {
super.run();
System.out.println("a" + threadLocal.get());
}
};
t.start();
}
}
public class Thread {
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
private Thread(...){
...
Thread parent = currentThread();
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
...
}
}
.3. JMM内存模型概念?为什么线程要用自己的内存?指令重排?happens-before、as-if-serial了解吗?单线程的程序一定是顺序的吗?
Java Memory Model用来定义多线程中变量的访问规则的抽象模型,用来解决变量的可见性、有序性和原子性问题,确保线程安全
JMM规定共享变量存储在主内存(Main Memory)中。每条线程都有私有的本地内存(Local Memory),本地内存中存储了共享变量的副本,用来进行线程内部的读写操作。不同的线程之间不能互相访问本地内存中的变量,线程间的变量值的传递均需要通过主内存来完成。因此线程之间的变量变得不可见
当线程更改了本地内存中共享变量的副本后,它需要将这些更改刷新到主内存中,以确保其他线程可以看到这些更改。
当线程读取共享变量时首先从本地内存中读取。如果本地内存中的副本是过时的,将从主内存中重新加载共享变量的最新值到本地内存中(抽象概念,并不真实存在。对应CPU L1、L2、L3缓存、寄存器或者其他硬件和编译器优化)
对于多核CPU的系统架构,每个核包括ALU计算单元+PC+Registers(寄存器)+L1缓存+L2缓存,同一个CPU所有核共享L3缓存
提高程序的并发性能:避免多线程竞争访问主内存影响性能,减少线程安全问题
提高执行效率:使CPU更容易重排序
为了提高性能,从Java源代码到最终实际执行的指令序列经历3种重排序,指令重排序后会导致内存可见性问题和一致性问题
编译器优化重排序。编译器在不改变单线程程序语义的前提下重新安排指令顺序
处理器的重排序。为了充分利用CPU资源(多核和流水线),处理器可以重新安排指令顺序
内存系统的重排序。由于多级缓存和内存的交互,内存读写操作在不同线程间的可见性可能出现顺序不一致
双重校验单例模式:Singleton instance=new Singleton()对应的JVM指令分为三步:分配内存空间->初始化对象->对象指向分配的内存空间,经过编译器指令重排序,第二、第三步就可能会重排序。
happens-before保证多线程情况下指令重排序后内存可见性问题和一致性问题,定义
如果操作A happens-before 操作B,那么操作A的结果对操作B可见。且操作A在操作B之前执行
两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则的顺序来执行。只需重排序之后的执行结果与happens-before关系来执行的结果一致
规则
传递性(Transitivity)如果操作A happens-before B,B happens-before C,那么A happens-before C
程序次序规则(Program Order Rule)程序按照书写顺序,前面的操作 happens-before 后面的操作
监视器锁定规则(Monitor Lock Rule)线程对锁对象的解锁happens-before随后另一个线程对该锁对象的加锁
volatile变量规则(Volatile Variable Rule)volatile变量的写操作happens-before该变量的读操作
线程启动规则(Thread Start Rule)线程的 start() 方法 happens-before 线程中的所有操作
线程中断规则(Thread Interruption Rule)线程调用中断interrupt() happens-before检测该线程中断(通过 isInterrupted()、interrupted或捕获InterruptedException)。
线程终止规则(Thread Termination Rule)线程中的所有操作happens-before对此线程的终止检测(通过Thread.join()方法结束、Thread.isAlive()的返回值)
对象终结规则(Finalizer Rule)对象的构造函数结束happens-before该对象的finalize()方法。TOM VS ITF
as-if-serial保证单线程情况下~
语义:不管怎么重排序,单线程程序的执行结果不变。不允许重排序存在数据依赖关系的操作
.4. volatile特性、场景√原理/怎么保证有序性和可见性?√与atomic变量区别?
无上下文切换和调度,不阻塞线程、可见性、有序性:禁止重排序
多线程环境下的单次读写。状态标记变量(停止线程)、单例禁止重排序双重检查
为了实现volatile的内存语义/特性:JMM会限制以下重排序,
当第一/二个操作是volatile读/写时,不管第二/一个操作是什么,都不能重排序。确保volatile读/写之后/前的操作不会被编译器重排序到volatile读/写之前/后。
当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。
编译器在生成字节码时,会在指令序列中插入内存屏障
在volatile读操作的后面插入③LoadLoad、④LoadStore屏障(1)
在volatile写操作的①前、②后插入StoreStore、StoreLoad屏障(2、3)
为什么少了普通读-volatile写禁止重排序?(volatile写前LoadStore屏障)因为一个是普通变量读,一个是volatile的读\写,两个变量之间没有happens-before规则
编译时对有volatile关键字的变量,在汇编代码中加入lock前缀指令(lock addl)锁住缓存行,引起处理器CPU缓存行的数据写回主内存并导致其他处理器的缓存失效(处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期)。当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值(一次读取64字节缓存行)相当于一个内存屏障(一组处理指令),用来实现对内存操作的顺序限制。内存屏障使用sfence mfence lfence等系统原语或者锁总线实现
重排序导致结果逻辑错误的例子
int a = 10;
volatile int flag = 1;
a = 100;
flag = 1;
if (flag == 1) {
System.out.println(a); // 期望打印 100
}
线程 A 先修改变量 a,然后设置 flag 为 1 来通知其他线程。然而,如果写屏障失效,可能发生重排序
flag = 1 可能先执行,而 a = 100 后执行。
线程 B 可能在看到 flag == 1 时,读取到的是变量 a 的旧值 10,而不是 100
volatile不能保证原子性。atomic变量可以保证原子性
.5. synchronized关键字√方法和代码块锁对象区别?怎么保证三性、可重入?同步代码、静态同步方法原理?锁升级原理?优化?和volatile、ReenTrantLock异同?√什么场景下可用volatile替换?
非公平锁、悲观锁、可重入锁、互斥锁、保证原子性、可见性、有序性。用于线程同步,确保在同一时刻只有一个线程可以执行,防止多个线程同时访问共享资源导致的数据不一致问题
同步代码块synchronized (实例对象/this/xxx.class) {} 锁对象是实例对象/类
同步方法synchronized void method(){} 锁对象是当前实例对象
静态同步方法public static synchronized void method(){}锁对象是当前Class类
锁对象属性变化不影响,引用改变影响,一般用final修饰;锁对象不能用字符串常量,可能与类库使用同一把锁
线程加锁前,先清空工作内存中共享变量的值,使用共享变量时需要从主内存中重新读取最新值
线程加锁后,其它线程无法获取主内存中的共享变量
线程解锁前,把共享变量的最新值刷新到主内存中
synchronized 同步的代码块,具有排他性,一次只能被一个线程拥有,所以 synchronized 保证同一时刻,代码是单线程执行的。因为 as-if-serial 语义的存在,单线程的程序能保证最终结果是有序的,
synchronized 保证的有序是执行结果的有序性,而不是防止指令重排的有序性
synchronized锁对象有个计数器,线程获取锁后+1计数,线程执行完毕后-1,直到清零释放锁
同步代码块原理:编译后产生字节码指令monitorenter和monitorexit,插入到同步代码块的开始位置、方法结束处和异常处。异常时锁也能释放,避免死锁
静态/普通同步方法原理:在flag添加方法修饰符ACC_SYNCHRONIZED,表示该方法是同步方法
都是基于监视器Monitor实现,线程执行同步代码块前会尝试获取监视器monitor,如果获取不到锁会一直阻塞,直到锁被另外一个线程释放为止,如果获取锁的线程进入休眠或者阻塞,除非当前线程异常,否则其他线程尝试获取锁必须一直等待
`javap -c -s -v -l SynchronizedDemo.class`
Java对象头里的Mark Word中的锁状态标志位记录了锁的状态,锁升级过程:
无锁:
偏向锁:
偏向锁获取:当第一个线程获取锁时,Mark Word会被设置为偏向锁状态(偏向锁标识位为1,锁标识位为01)并存储该线程ID。同一线程再次请求锁无需再次同步。
轻量级锁:
当另一个线程尝试获取该锁时,偏向锁升级为轻量级锁。它会栈帧中创建一个锁记录(Lock Record),然后通过CAS操作将对象头的Mark Word替换为指向锁记录的指针。如果成功,当前线程获取轻量级锁;如果失败,则使用自旋来获取锁。当一个线程旋超过10次,或者自旋线程数超过CPU核数的一半,如果还没有获取到锁,锁从轻量级升级为重量级锁
重量级锁:(lock comxchg指令)(堆) 由操作系统的Mutex Lock实现,操作系统实现线程之间的切换时需要从用户态转换到内核态,效率低。通过将对象头的Mark Word指向监视器Monitor对象来实现,当锁未被释放时,其他线程试图获取锁时,都会被阻塞住,当持有锁的线程释放锁之后会唤醒这些线程,被唤醒的线程就会进行竞争
引入偏向锁:消除同一线程的反复锁获取和释放的开销
引入轻量级锁:用于线程交替执行同步块时避免线程的阻塞与唤醒
自旋锁:当线程尝试获取轻量级锁失败时会进行自旋,循环检查锁是否可用,以避免立即阻塞。自旋次数根据之前在同一个锁上的自旋时间和锁的状态动态调整。
锁粗化:JVM将单线程中的连续的锁操作合并为一个更大范围的锁操作,减少锁请求的次数。优化循环内连续加锁解锁的情况
锁消除:即时编译器(JIT)编译时通过逃逸分析消除单线程中不必要的锁操作。减少同步开销
volatile用在变量级别;synchronized用在变量、方法、和类级别的
volatile不能保证修改的原子性;synchronized能
volatile不会造成线程的阻塞;synchronized可能造成
volatile变量不会被重排序优化;synchronized则允许更多的优化从volatile特点和用法想区别
都是可重入锁
synchronized关键字可修饰方法、代码块。ReentrantLock是类,提供tryLock和lock方法
synchronized只支持非公平锁,ReentrantLock提供公平锁和非公平锁、等待可中断、选择性通知(锁可以绑定多个条件)等特性
synchronized无需手动释放锁,异常时自动释放锁不会死锁。Lock发生异常不自动释放锁可能死锁,要finally中调用unLock方法释放锁
synchronized通过Java对象头锁标记和Monitor对象实现同步。ReentrantLock通过CAS、AQS和LockSupport(用于阻塞和解除阻塞)实现同步
synchronized只能通过wait和notify/notifyAll方法唤醒一个线程或者全部线程(单条件通知),ReentrantLock能实现多条件通知(可以绑定多个 Condition)用功死实条
1写N读
.6. AQS?ReentrantLock、公平锁原理?
AbstractQueuedSynchronizer用于实现同步器(如锁和信号量)的基础框架
通过被volatile修饰的int状态变量管理同步状态和先进先出等待队列管理等待的线程。每个线程通过CAS操作修改同步状态变量来获得锁,如果修改成功,则表示该线程获得锁,否则将该线程加入到等待队列并阻塞,如果是头结点则唤醒后通过自旋继续尝试获得锁
获取锁方式分为独占方式和共享方式,线程使用独占方式获取了资源,其它线程就会在获取失败后被阻塞。线程使用共享方式获取了资源,其他线程还可以通过CAS的方式进行获取。获取不到同步状态,将加入等待队列并阻塞,如果是头结点则唤醒后通过自旋继续尝试获得锁,解锁时唤醒后继节点
子类通过继承AQS并实现它的方法来管理同步状态
tryAcquire/tryRelease:独占方式尝试获取/释放资源
tryAcquireShared/tryReleaseShared(int arg):共享方式尝试获取/释放资源
isHeldExclusively():该线程是否正在独占资源
互斥锁,可重入锁
//构造方法默认创建非公平锁
ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try{
}catch(Exception ex){
}finally{
lock.unlock();
}
通过计数器来记录锁的持有次数。当线程调用lock方法获取锁时判断锁是否已经被其他线程持有。如果否,当前线程获得锁;如果是则根据锁的公平性策略,可能会被加入到等待队列中。线程首次获取锁时,计数器值加1;如果该线程再次获取锁,继续加+1;每释放一次锁,计数器减1。当线程调用unlock方法时会将持有锁的计数减1,如果计数到达0则释放锁,并唤醒等待队列中的线程来竞争锁
区别:公平锁通过hasQueuedPredecessors方法判断当前线程是否是头节点,不是则获取锁失败
因为AQS唤醒等待队列中的第一个线程,但是新线程可能在同一时刻尝试获取锁,导致多个线程竞争
非公平锁性能好,吞吐量大。非公平锁获取锁的时间不确定,阻塞队列中的线程可能长期处于饥饿状态
.7. 保证原子性/多线程下i++??13个原子操作类√AtomicInteger原理?
使用原子类,如AtomicInteger
使用juc包下的锁,如ReentrantLock,对i++操作加锁
使用synchronized,对i++操作加锁
java.util.concurrent.atomic包中原子操作类提供线程安全地更新变量的方式
原理:CAS原子操作volatile变量
原子更新基本类型类:AtomicBoolean/Integer/Long
char=>强转为int
float=>Float.floatToIntBits/intBitsToFloat(float/int)转为int
double=>Double.doubleToLongBits/longBitsToDouble(double/long) 转为long
~数组某个元素/引用类型:AtomicInteger/Long/ReferenceArray
~引用类型/多个对象属性:AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference:~带有标记位的引用类型。解决ABA
~字段类:AtomicInteger/LongFieldUpdater、AtomicStampedReference:~带有版本号的引用类型(解决ABA)
使用CAS+volatile int实现
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//compareAndSwapInt是一个native方法,基于CAS来操作int类型变量
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
.8. CountDownLatch√CyclicBarrier区别?
CountDownLatch构造方法传入countDown调用次数n。直到countDown调用次数达到n才从await方法返回。多线程解析一个Excel里多个sheet的数据(或者使用join)
CyclicBarrier构造方法传入await调用次数n,直到await达到n才从await方法返回继续执行。多线程计算数据,最后合并计算结果的场景
等待机制:CountDownLatch允许一个或多个线程等待其他线程完成任务,CyclicBarrier多个线程相互等待,直到所有线程都到达屏障
重复使用:CountDownLatch不能,计数器减为0不能重置,CyclicBarrier能,屏障触发后会重置
参与线程:CountDownLatch任何线程都可以减少计数,等待的线程可以不同,CyclicBarrier参与的线程必须达到指定数量,一起进入屏障
.9. 使用线程池原因?场景?处理流程√创建线程池常用参数?√拒绝策略?√Runnable、Callable√、execute()和submit()方法区别?关闭流程?参数设置?阿里开发规范为什么不允许Executors创建线程池/弊端?异常处理?状态?调优?单机线程池宕机处理?
降低资源消耗:重复用线程降低频繁线程创建和销毁造成的消耗
提高响应速度:不需要等线程创建就能立即执行任务
支持定时、周期性、单线程执行和并发数控制等功能
发送短信,公众号消息;处理9图gif;保存文件(头像、身份证)
如果线程数小于corePoolSize则创建新线程执行任务,否则将任务放入等待队列
如果任务等待队列已满且线程数小于maxPoolSize,则创建新线程处理任务
如果任务队列已满且线程数等于maxPoolSize,则根据拒绝执行策略处理任务
空闲线程会从任务队列中取出任务来执行,当任务执行完毕后会等待下一个任务。当线程空闲时间超出指定时间,且线程数大于核心线程数时,线程被回收
ThreadPoolExecutor构造方法参数
corePoolSize:核心线程数。即使这些线程空闲也不会被回收
maximumPoolSize:当工作队列满了之后线程池的最大线程数
workQueue:保存任务的阻塞队列
RejectedExecutionHandler:拒绝策略。当线程池和工作队列都满了之后对新任务的处理策略
keepAliveTime:线程数超过corePoolSize的线程存活时间
unit:keepAliveTime的单位,如秒
threadFactory:线程工厂类,为线程设置名称,方便定位问题
AbortPolicy抛出RejectedExecutionException拒绝处理新任务(默认)
CallerRunsPolicy使用当前线程执行任务,可能会阻塞主线程
DiscardOldestPolicy丢弃在队列中队首的任务并执行新任务
DiscardPolicy丢弃任务
实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务
Runnable接口的run方法无返回值,不能抛异常
Callable接口的call方法返回值是泛型,能抛出异常,和Future、FutureTask配合用来获取异步执行的结果
execute只能提交Runnable类型任务,无法判断任务执行成功与否;用于提交不需要返回值的任务
submit能提交Runnable和Callable类型任务,用于提交需要返回值的任务、通过Future的get()方法阻塞直到任务完成并获取返回值
shutdown和shutdownNow方法都遍历所有线程并调用interrupt方法中断线程。停止接收新任务,使isShutdown方法返回true,当所有的线程都关闭成功,调用isTerminated方法返回true才表示线程池关闭
shutdown将线程池状态置为shutdown,直到内部正在跑的任务和队列里等待的任务全部执行完成后才停止
shutdownNow将线程池状态置为STOP,尝试将正在跑的任务interrupt中断并忽略队列里等待的任务并最后返回未执行的任务列表
通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法
任务的性质选择合适的线程池大小:N为CPU核数:过小导致任务一直排队。过大导致竞争CPU资源激烈,增加上下文切换的开销
CPU密集型任务(加解密逻辑操作)Ncpu+1,+1是因为可能存在页缺失(有些数据在硬盘中需要多一个线程将数据读入内存)
IO密集型任务(数据库连接,网络传输等)2xNcpu
混合型任务则拆成CPU和IO密集型任务
cpu使用率和负载
内存使用率:队列的大小通过前期计算线程池任务的条数,来合理的设置队列的大小,不宜过小,让其不会溢出,因为溢出会走拒绝策略,多少会影响性能,也会增加复杂度。
下游系统抗并发能力
任务的优先级:高,中和低。PriorityBlockingQueue如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行
任务的执行时间:长,中和短
是否依赖其他系统资源,如数据库连接。线程数应该设置越大
Runtime.getRuntime().availableProcessors()获得CPU个数
选择有界队列并设置大小
尽量使用自定义的线程池,而不用Executors创建线程池
FixedThreadPool(CPU密集任务)和SingleThreadExecutor(串行任务)使用LinkedBlockingQueue任务队列长度为Integer.MAX_VALUE,任务执行时间长,队列堆积导致OOM
CachedThreadPool(大量并发小任务,60秒不执行任务线程将被回收)和ScheduledThreadPool(小量周期性任务)最大线程数量为Integer.MAX_VALUE,任务执行时间长,任务堆积导致创建大量线程导致OOM
trycatch捕获
submit执行,Future.get接受异常
重写ThreadPoolExcutor.afterExcute方法,处理传递的异常引用
实例化时传入自定义ThreadFactory根据线程名定位问题同时设置Thread.uncaughtExceptionHandler处理异常
RUNNING:正常状态
SHUTDOWN/STOP:不接受新的任务提交,继续/中断正在执行任务,继续/不继续处理阻塞队列中的任务,线程池中执行任务为空,队列为空,进入TIDYING状态;(shutdown)
TIDYING:所有任务销毁,workCount为0,转换为此状态时会执行terminated()
TERMINATED:terminated()方法结束后的状态
事前评估->监控/告警->动态调整->事后观察
对任务队列持久化;正在处理任务事务控制;断电之后正在处理任务的回滚,通过日志恢复该次操作;服务器重启后阻塞队列中的数据再加载。
.10. Semaphore、Exchanger、Fork/Join框架?
信号量用于控制并发线程数
流量控制,有限资源,比如数据库连接。
acquire\tryAcquire()()方法\尝试获取一个许可证,用完后调用release()方法归还许可证
SemaphoreTest
public class SemaphoreTest {
private static ExecutorService threadPool = Executors.newFixedThreadPool(30);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("x");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
Exchanger用于线程间的数据交换。两个线程通过exchange方法交换数据,第一个线程会等待第二个线程执行exchange方法才交换数据
遗传算法,交配对象交换数据得出结果
校对工作,两人录入银行信息判断数据是否录入一致
ExchangerTest
public class ExchangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
用于并行执行任务的框架,把大任务分割成若干个小任务并汇总每个小任务结果后得到结果
工作窃取算法:大任务拆成多个小任务放到不同的双端队列分别用线程处理。处理完的线程去其它队列取任务来执行(工作窃取),被窃取任务线程从双端队列的头部拿,窃取任务的线程从双端队列的尾部拿,减少线程之间的竞争
Fork/Join框架要实现compute方法,如果任务足够小就直接执行。如果比较大则分割成两个子任务,每个子任务在调用fork方法时又会进compute方法,使用join方法会等待子任务执行完并得到其结果
RecursiveTask
//计算1~n和:设置分割阈值,任务大于阈值就拆分任务 任务有结果,所以要继承 RecursiveTask
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 16; // 阈值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork(); // 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join(); // 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(); //负责计算1+2+3+4
CountTask task = new CountTask(1, 100);
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}