各类锁
1.公平、非公平锁
- 公平锁:不能插队,先到先得
- 非公平锁:可以插队(默认都是非公平锁,比如
ReentrantLock()
),可以提高效率,但是可能导致某些进程一直拿不到锁
2.可重入锁
允许同一个线程多次获取同一把锁的锁机制。如果一个线程已经获得了某个锁,那么它可以再次获取相同的锁而不被阻塞。可重入锁的主要优势在于避免死锁,同时简化了编程模型。
ReentrantLock
和synchronized
关键字都是可重入锁的实现
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
Runnable task = () -> {
try {
lock.lock(); // 第一次获取锁
System.out.println(Thread.currentThread().getName() + " acquired the lock.");
// 可重入,再次获取锁
lock.lock(); // 第二次获取锁
System.out.println(Thread.currentThread().getName() + " acquired the lock again.");
// 临界区代码
} finally {
lock.unlock(); // 释放锁
System.out.println(Thread.currentThread().getName() + " released the lock.");
// 可重入,再次释放锁
lock.unlock(); // 第二次释放锁
System.out.println(Thread.currentThread().getName() + " released the lock again.");
}
};
// 启动多个线程
for (int i = 0; i < 3; i++) {
new Thread(task).start();
}
}
}
3.自旋锁
会不断地去请求的一种锁,java中可以利用AtomicReference<>();
的CAS去 实现
public class SpinlockDemo {
// int 0
// Thread null
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 加锁
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> mylock");
// 自旋锁
while (!atomicReference.compareAndSet(null,thread)){
}
}
// 解锁
// 加锁
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==> myUnlock");
atomicReference.compareAndSet(thread,null);
}
}
函数四大接口
lambda表达式就是针对函数式接口设计的
Function 函数式接口:只有一个方法的接口,比如Runnable()
- Predicate 断定型接口:只有一个方法,且该方法只有一个参数,返回值为Boolean型
- Consumer 消费型接口:只有输入,没有返回值
- Supplier 供给型接口:没有输入参数,只有返回值
https://www.kuangstudy.com/bbs/1640650143068217345
不安全的集合、队列
ArrayList、Set、List、Map
wait/sleep 的区别
1.来自不同类,wait->Object,sleep->Thread
2.锁的释放,wait->释放锁,sleep->不释放锁
3.使用范围,wait->同步代码块,sleep->任何地方
synchronized 和 Lock 区别
- lock是一个接口,而synchronized是java的一个关键字。
- synchronized在发生异常时会自动释放占有的锁,因此不会出现死锁;而lock发生异常时,不会主动释放占有的锁,必须手动来释放锁,可能引起死锁的发生。
- Synchronized 可重入锁,不可以中断的,非公平; Lock ,可重入锁,可以判断锁,非公平(可以自己设置);
- Synchronized适合锁少量的代码同步问题,Lock适合锁大量的同步代码!
- Synchronized 线程1(获得锁,阻塞)、绩程2(等待,傻傻的等) ; Lock锁就不一定会等待下去;
- Synchronized无法判断获取锁的状态,Lock 可以判断是否获取到了锁
synchronized调用锁时锁的谁?
是调用该方式时的实例对象
当static修饰synchronized时锁的谁?
锁的是类,即xxxx.class
CopyOnWriteArrayList<>():
一种可支持并发情况下的ArrayList,比起Vector,用了ReentrantLock来代替Vector里的synchronized。
HashTable被淘汰的原因
HashTable实现线程安全的代价比较大,那就是所有有可能产生竞争的方法里都加上了synchronized,这就导致在出现竞争时,只能一个线程对整个HashTable进行操作,其他线程都需要阻塞等待当前取到锁的线程执行完成,这样效率非常低。
ConcurrentHashMap安全的原因
在JDK1.8版本中采用了CAS+synchronized的方法来保证并发,线程安全,使用数组加链表加红黑树的结构解决hash冲突
- CAS(Compare-And-Swap)是一种多线程同步的原子操作,通常用于实现并发算法。它是一种原子性的操作,意味着在整个操作过程中,其他线程无法干扰或改变被操作的数据。CAS 操作包含三个操作数:一个内存位置(V)、预期原值(A)和新值(B)。操作的意义是:仅当 V 的值等于 A 时,将 V 的值替换为 B;否则,不做任何操作。
- 自旋写入是一种在并发编程中使用的技术,它是基于忙等待的一种策略。当一个线程尝试执行某个操作时,如果由于竞争条件导致操作失败,而该操作是可重试的,线程会不断地重试,而不是放弃 CPU 的执行时间。这种重试的过程称为自旋。
在上述HashMap的代码中,通过CAS操作插入新节点时,如果CAS操作失败(即有其他线程已经在当前位置插入了节点),则会通过自旋不断尝试插入,直到成功。这种方式避免了使用传统的锁机制,如synchronized,减少了线程切换的开销,但也可能导致一定的性能损失,因为线程一直在忙等待。
Callable与Thread的关系
因为Thread()只能通过调用Runnable的实现类来增加进程,所以需要一个Runnable的实现类来装载Callable,故有
Runnabie
- FutureTask<>(Callable)
来装载
Callable的一些注意事项
1、使用get()方法获得结果时可能会造成阻塞,需要等待
2、结果会有缓存, ~如果某次输入一致时(不确定是什么判断的)~,会直接从结果缓存中调用输出
JUC中常用辅助方法
- CountDownLanch:计数器,通过await方法判断计数器是否归0,未归0前会让线程在此等待。
- CyclicBarrier:程序线程数量计数器,到达指定数字时执行指定线程
- Semphore:信号量操作,通过acquire()和release()方法阻塞和释放进程
读写锁和独占共享锁
- 写锁=独占锁(只能一个线程占用)
- 读锁=共享锁(可以多个线程同时占用)
可以使用juc中的ReadWriteLock类去实现
ArrayBlockingQueue阻塞队列
add()
和put()
还是和普通Queue一样正常使用- 可以利用
offer(e)
和poll(e)
存放或取出队列,peek()
检测队首元素可以避免队列报异常。offer(e)
和poll(e)
可以设置超时等待。 - 使用
put()
和take()
可以做阻塞,在队列满了或者空时自动等待阻塞。
创建线程池的方法
- Executors:可以直接使用三大方法创建,但是该方法不安全
- ThreadPoolExecutor:需要利用七大参数自行设定值,工作中一般使用这个
线程池大小的设定
CPU密集型:线程数可以根据cpu的核数设(可以通过
RunTime.getRumtime().avaliableProcessors()
动态获取当前设备的核数)- 每个CPU核心可以同时执行一个线程。如果超过线程的 CPU 核心数量可能会导致频繁的上下文切换,从而大大降低性能。
- IO密集型:设定线程数 > 程序中十分消耗io的线程数
线程池考点
三大方法:Executors的创建方法
- 单线程
newSingleThreadExecutor()
、 - 指定线程数
newFixedThreadPool(int nThreads)
、 - 自动动态线程数
newCachedThreadPool()
- 单线程
七大参数:ThreadPoolExecutor的设定参数
- int corePoo1Size,//核心线程池大小
- int maximumPoolsize,//最大核心线程池大小
- long keepAliveTime,//超时了没有人调用就会释放
- Timeunit unit, // 超时单位
- BlockingQueue
workQueue, //阻塞队列 - ThreadFactory threadF actory, //线程工厂,一般不用
- RejectedExecutionHandler handler // 拒绝策略
四种拒绝策略:ThreadPoolExecutor的方法中
- AbortPolicy() 线程超过最大值就不处理,丢弃该线程
- CallerRunspolicy() 当线程超过最大值时,让传递过来的线程自己执行(比如main线程转递过来就让main自己执行)
- DiscardPolicy() 线程超过最大值就不处理,丢弃该线程,但是不会报异常
- DiscardOldestPolicy() 线程超过最大值时,会去尝试和最早的线程去竞争,如果竞争失败就抛弃
ForkJoin
大数据量才使用
用于分解并行流,通过内部的双端队列,当一个线程执行完后自动帮助其他未完成的进程执行
- 通过Forkjoin.submit()(异步执行,可以通过ForkJoinTask类的.get()方法阻塞获得执行结果)或者Forkjoin.execute(实时)执行一个继承了ForkJoinTask类的方法(或者RecursiveTask)的类即可执行
package forkjoin;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end-start) < temp ){
Long sum = 0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}else{
long middle = (start + end)/2;//中间值
ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
task1.fork();//拆分任务,把任务压入线程队列
ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
task2.fork();//拆分任务,把任务压入线程队列
return task1.join()+task2.join();
}
}
}
三种并行方法:
package forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//test1();//5533
//test2();//3709
test3();//222
}
public static void test1(){
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+" 时间 : "+(end-start));
}
public static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
Long sum = submit.get();
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+" 时间 : "+(end-start));
}
public static void test3(){
long start = System.currentTimeMillis();
// Stream并行流
long sum = LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum = "+sum+" 时间 : "+(end-start));
}
}
Java中的异步回调
利用JUC中的Future的实现类CompletableFuture去实现
无返回值CompletableFuture
: - 利用runAsync()方法执行异步任务,输入一个供给型接口的函数
- 利用get()方法获得阻塞结果(即上面的runAsync()方法如果没有执行完成,线程运行到此就会发生阻塞)
有返回值CompletableFuture
: - 利用supplyAsync()方法执行异步任务,输入一个供给型接口的函数
- 利用whenComplete()方法获得运行成功的结果
- 利用exceptionally()方法获得运行失败(报错)的结果
- 利用get()方法获得阻塞结果
package main;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Demo2 {
public static void main(String[] args) throws Exception {
//没有返回值的runAsync 异步回调
// CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
// try {
// TimeUnit.SECONDS.sleep(2);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(Thread.currentThread().getName()+"runAsync-->Void");
// });
// System.out.println("1111");
// completableFuture.get();//获得阻塞执行结果
//有返回值的 supplyAsync 异步回调
//分为成功和失败的回调
//失败返回的是错误信息
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"supplyAsync-->Integer");
int i = 10/0;
return 1024;
});
System.out.println(completableFuture.whenComplete((t,u)->{
System.out.println("t-->"+t);//正常的返回结果
System.out.println("u-->"+u);//错误信息
}).exceptionally((e)->{
System.out.println(e.getMessage());//可获取到错误的返回结果
return 2233;
}).get());
}
}
关于Volatile的三特性
Volatile是一种轻量级的同步机制
1、保证可见性:
JMM同步:
- 线程解锁前,要把共享遍历立刻刷回主存
- 线程加锁前,要先把主存刷新到工作内存中
- 加锁和解锁是同一把锁
- JMM的内存到线程数据交互操作时会出现“脏读”的现象
- 而Volatile会通过读取时加锁保证数据的可见性从而避免“脏读”现象的出现
2、不保证原子性
People p = new Peole();
进程会先分配内存空间、执行构造方法初始化对象、最后把对象指向这个空间,这里多个步骤就说明可能存在原子性被破环的可能。- 写入数据时不加锁,会导致内存数据可能被覆盖
- 可以使用juc中的原子类使得volatile保证原子性
3、禁止指令重排
- 编译器会自动根据程序进行指令重排,有可能导致多线程时因为不同的执行顺序导致获得不对的结果
利用synchronized和volatile保证懒汉式的原子性
private volatile static LazyMan lazyMan; //volatile防止指令重排
//双重检测锁模式的 懒汉式单例 DCL懒汉式
public static LazyMan getInstance(){
if (lazyMan == null) {
synchronized (LazyMan.class){
if (lazyMan == null) {
lazyMan = new LazyMan();//不是一个原子操作
}
}
}
return lazyMan;
};
CAS
- 自带原子性,比较并交换(Compare And Set),java中可以用AtomicXXXX实现,它会不停的去取锁直到可以使用为止,不会让线程死等
- 一次只能保证一个变量的原子性
存在ABA问题
ABA 问题指一个线程中,多次执行cas使得共享变量的值从 A 变为 B,再变回 A 的情况下。
CAS 操作可能会误认为共享变量的值没有发生变化,因为它最终又回到了原始的值 A。这可能导致一些并发问题,因为在整个过程中可能发生了其他线程的操作,而 CAS 操作并未意识到这一点。
原子引用实现乐观锁(尽量不用,因为锁可以代替)
利用AtomicStampedReference<>(initialRef,initialStamp)
设定版本号来实现乐观锁
- 如果该原子引用的泛型是包装类,注意引用问题,它底层在使用cas(
.compareAndSet()
)时使用"=="而不是.equal(); - 比如Integer的包装类数值在
-127~128
时都是从内存中取存在的对象,因此此时“==
”还是可以用于比较的,但是这个范围外的数就会在堆中生成新的对象,导致无论如何对比(使用“==
”对比)都会造成输出是false
JUC 版的生产者消费者问题 Lock.Condition
Condition
则提供了类似 Object.wait()
和 Object.notify()
的功能(分别是.await()
和.signal()
),用于在某些条件发生变化时进行线程间通信。