跳至主要內容

多线程程序设计

HeChuangJun约 1077 字大约 4 分钟

容器实现(淘宝?)

实现一个容器,提供add,size方法
写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到达5时,线程2给出提示并结束

容器实现
public class MyContainer{
  private List<Object> list = new ArrayList<>();
  public void add(Object ele) { list.add(ele); }
  public int size() { return list.size(); }
}
Test1
public class Test1 {
    public static void main(String[] args) {
        MyContainer container = new MyContainer();
        final Object lock = new Object();
        new Thread(() -> {
            synchronized (lock) {
                System.out.println("t2 启动");
                if (container.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("监测到容器长度为5,线程2立即退出");
                lock.notify();
            }
        }, "t2").start();

        // 先启动t2线程,让t2线程进入等待状态
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        new Thread(() -> {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    container.add(new Object());
                    System.out.println("add " + i);
                    // 当长度为5时,通知 t2 进行退出
                    if (container.size() == 5) {
                        lock.notify(); // notify 不会释放锁,即便通知t2,t2也获取不到锁
                        // 可以在wait一下,将锁释放,再让t2通知t1继续执行
                        //notify不能唤醒特定线程,由cpu决定的
                        //notify会让线程从上次的wait的地方继续执行
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "t1").start();
    }

}
Test2
public class Test2 {

    public static void main(String[] args) {

        MyContainer container = new MyContainer();

        // Count down 往下数  Latch 门闩
        // 门闩不能保证可见性,不是一种同步方式,只是一种线程通信方式,保证不了可见性
        // 门闩的等待,不会持有任何锁
        CountDownLatch latch = new CountDownLatch(1);

        new Thread(() -> {
            System.out.println("t2 启动");
            if (container.size() != 5) {
                try {
                    latch.await();
                    // 指定等待时间
                    //latch.await(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("监测到容器长度为5,线程2立即退出");
        }, "t2").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            System.out.println("t1 启动");
            for (int i = 0; i < 10; i++) {
                container.add(new Object());
                System.out.println("add " + i);
                // 当长度为5时,撤掉一个门闩,此时门闩为0,门会打开,即t2会继续执行
                if (container.size() == 5) {
                    latch.countDown();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();
    }

}

容器实现2?为什么使用while而不使用if?不是有锁吗?为什么会需要循环判断?为什么调用notifyAll而不是notify?

实现固定容量容器,拥有put和get方法,以及getCount方法
能够支持2个生产者线程以及10个消费者线程的阻塞调用
生产者消费者模式
如果调用 get方法时,容器为空,get方法就需要阻塞等待
如果调用 put方法时,容器满了,put方法就需要阻塞等待
实现方式:

在容器已满的情况下,put方法会wait等待,当容器中的元素被消费者消费了一部分就会唤醒所有put方法,
put方法会继续向下执行,直接执行list.add(t),那么多个生产者线程执行list.add()有可能出现数据一致性的问题。
如果使用while则会循环判断,就避免了这些问题。

wait之后锁释放,再次被唤醒时并且得到锁后从list.add()开始执行,会无判断直接加入到容器中。

因为notify有可能再次叫醒同一个生产者线程

wait/notify
public class MyContainer1<T> {
    private final LinkedList<T> list = new LinkedList<>();
    private final int MAX = 10;
    private int count = 0;
    public synchronized void put(T t) {
        while (MAX == count) { //如果容量最大释放锁等待
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(t);
        ++count;
        this.notifyAll(); // 通知消费者线程消费
    }

    public synchronized T get() {
        while (list.size() == 0) { // 如果容量为空释放锁等待  
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        T t = list.removeFirst();
        count--;
        this.notifyAll(); // 通知生产者线程生产
        return t;
    }
}
Condition
public class MyContainer2<T> {
    private final LinkedList<T> list = new LinkedList<>();
    private final int MAX = 10;
    private int count = 0;

    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();

    public synchronized void put(T t) {
        lock.lock();
        try {
            while (MAX == count) {
                producer.await();
            }
            list.add(t);
            ++count;
            consumer.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public synchronized T get() {
        lock.lock();
        try {
            while (list.size() == 0) {
                consumer.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        T t = list.removeFirst();
        count--;
        producer.signalAll();
        return t;
    }

    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        // 启动消费者线程
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    System.out.println(c.get());
                }
            }, "c_" + i ).start();
        }

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 0; i < 2; i++) {
            new Thread(()->{
                for (int j = 0; j < 2; j++) {
                    c.put(Thread.currentThread().getName() + " " + j);
                }
            }, "p_" + i).start();
        }
    }
}