多线程程序设计
约 1077 字大约 4 分钟
容器实现(淘宝?)
实现一个容器,提供add,size方法
写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到达5时,线程2给出提示并结束
容器实现
public class MyContainer{
private List<Object> list = new ArrayList<>();
public void add(Object ele) { list.add(ele); }
public int size() { return list.size(); }
}
Test1
public class Test1 {
public static void main(String[] args) {
MyContainer container = new MyContainer();
final Object lock = new Object();
new Thread(() -> {
synchronized (lock) {
System.out.println("t2 启动");
if (container.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("监测到容器长度为5,线程2立即退出");
lock.notify();
}
}, "t2").start();
// 先启动t2线程,让t2线程进入等待状态
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
container.add(new Object());
System.out.println("add " + i);
// 当长度为5时,通知 t2 进行退出
if (container.size() == 5) {
lock.notify(); // notify 不会释放锁,即便通知t2,t2也获取不到锁
// 可以在wait一下,将锁释放,再让t2通知t1继续执行
//notify不能唤醒特定线程,由cpu决定的
//notify会让线程从上次的wait的地方继续执行
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}
Test2
public class Test2 {
public static void main(String[] args) {
MyContainer container = new MyContainer();
// Count down 往下数 Latch 门闩
// 门闩不能保证可见性,不是一种同步方式,只是一种线程通信方式,保证不了可见性
// 门闩的等待,不会持有任何锁
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
System.out.println("t2 启动");
if (container.size() != 5) {
try {
latch.await();
// 指定等待时间
//latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("监测到容器长度为5,线程2立即退出");
}, "t2").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
System.out.println("t1 启动");
for (int i = 0; i < 10; i++) {
container.add(new Object());
System.out.println("add " + i);
// 当长度为5时,撤掉一个门闩,此时门闩为0,门会打开,即t2会继续执行
if (container.size() == 5) {
latch.countDown();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1").start();
}
}
容器实现2?为什么使用while而不使用if?不是有锁吗?为什么会需要循环判断?为什么调用notifyAll而不是notify?
实现固定容量容器,拥有put和get方法,以及getCount方法
能够支持2个生产者线程以及10个消费者线程的阻塞调用
生产者消费者模式
如果调用 get方法时,容器为空,get方法就需要阻塞等待
如果调用 put方法时,容器满了,put方法就需要阻塞等待
实现方式:
在容器已满的情况下,put方法会wait等待,当容器中的元素被消费者消费了一部分就会唤醒所有put方法,
put方法会继续向下执行,直接执行list.add(t),那么多个生产者线程执行list.add()有可能出现数据一致性的问题。
如果使用while则会循环判断,就避免了这些问题。
wait之后锁释放,再次被唤醒时并且得到锁后从list.add()开始执行,会无判断直接加入到容器中。
因为notify有可能再次叫醒同一个生产者线程
wait/notify
public class MyContainer1<T> {
private final LinkedList<T> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
public synchronized void put(T t) {
while (MAX == count) { //如果容量最大释放锁等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(t);
++count;
this.notifyAll(); // 通知消费者线程消费
}
public synchronized T get() {
while (list.size() == 0) { // 如果容量为空释放锁等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = list.removeFirst();
count--;
this.notifyAll(); // 通知生产者线程生产
return t;
}
}
Condition
public class MyContainer2<T> {
private final LinkedList<T> list = new LinkedList<>();
private final int MAX = 10;
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public synchronized void put(T t) {
lock.lock();
try {
while (MAX == count) {
producer.await();
}
list.add(t);
++count;
consumer.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public synchronized T get() {
lock.lock();
try {
while (list.size() == 0) {
consumer.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
T t = list.removeFirst();
count--;
producer.signalAll();
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
// 启动消费者线程
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
System.out.println(c.get());
}
}, "c_" + i ).start();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 2; i++) {
new Thread(()->{
for (int j = 0; j < 2; j++) {
c.put(Thread.currentThread().getName() + " " + j);
}
}, "p_" + i).start();
}
}
}